From aef02d5e376f57f540a4d5998c0fa73361a9c900 Mon Sep 17 00:00:00 2001 From: Alberto Schiabel Date: Mon, 30 Dec 2024 16:58:29 +0100 Subject: [PATCH] chore(driver-adapters): refactor query-engine test executor (#5100) * chore(driver-adapters): refactor query-engine test executor, in preparation for schema-engine-wasm playground * chore: update Node.js version to v20 for Driver Adapters * feat(driver-adapters-executor): log request on "initializeeSchema"; add brand type + sanity check to SchemaId * feat(connector-test-kit-rs): use workspace root only, add warnings on config loading errors (hidden behind RUST_LOG=warn) * fix(driver-adapters-executor): schema path setting --------- Co-authored-by: jkomyno <12381818+jkomyno@users.noreply.github.com> --- .../test-driver-adapters-template.yml | 2 +- Cargo.lock | 96 +++++- query-engine/connector-test-kit-rs/README.md | 2 +- .../query-tests-setup/Cargo.toml | 2 + .../query-tests-setup/src/config.rs | 43 ++- .../driver-adapters/executor/package.json | 4 +- .../executor/script/testd-qe.sh | 2 + .../driver-adapters/executor/script/testd.sh | 2 - .../driver-adapters/executor/src/bench.ts | 261 ++++++++-------- .../src/driver-adapters-manager/neon.ws.ts | 4 +- .../src/driver-adapters-manager/pg.ts | 4 +- .../executor/src/engines/Library.ts | 2 +- .../executor/src/engines/QueryEngine.ts | 91 +++--- .../driver-adapters/executor/src/qe.ts | 76 ----- .../executor/src/query-engine-wasm.ts | 43 +++ .../executor/src/query-engine.ts | 73 +++++ .../driver-adapters/executor/src/recording.ts | 79 +++-- .../driver-adapters/executor/src/requestId.ts | 16 +- .../driver-adapters/executor/src/rn.ts | 64 ++-- .../driver-adapters/executor/src/setup.ts | 33 ++ .../driver-adapters/executor/src/testd-qe.ts | 243 +++++++++++++++ .../driver-adapters/executor/src/testd.ts | 285 ------------------ .../executor/src/types/jsonRpc.ts | 2 +- .../driver-adapters/executor/src/utils.ts | 34 ++- .../driver-adapters/executor/src/wasm.ts | 30 -- 25 files changed, 821 insertions(+), 672 deletions(-) create mode 100755 query-engine/driver-adapters/executor/script/testd-qe.sh delete mode 100755 query-engine/driver-adapters/executor/script/testd.sh delete mode 100644 query-engine/driver-adapters/executor/src/qe.ts create mode 100644 query-engine/driver-adapters/executor/src/query-engine-wasm.ts create mode 100644 query-engine/driver-adapters/executor/src/query-engine.ts create mode 100644 query-engine/driver-adapters/executor/src/setup.ts create mode 100644 query-engine/driver-adapters/executor/src/testd-qe.ts delete mode 100644 query-engine/driver-adapters/executor/src/testd.ts delete mode 100644 query-engine/driver-adapters/executor/src/wasm.ts diff --git a/.github/workflows/test-driver-adapters-template.yml b/.github/workflows/test-driver-adapters-template.yml index d0598355f375..d7451ddca8b1 100644 --- a/.github/workflows/test-driver-adapters-template.yml +++ b/.github/workflows/test-driver-adapters-template.yml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: false matrix: - node_version: ["18"] + node_version: ["20"] partition: ["1/4", "2/4", "3/4", "4/4"] env: LOG_LEVEL: "info" # Set to "debug" to trace the query engine and node process running the driver adapter diff --git a/Cargo.lock b/Cargo.lock index fc18c125a92c..cb9d754e2add 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,6 +86,55 @@ dependencies = [ "winapi", ] +[[package]] +name = "anstream" +version = "0.6.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.72" @@ -653,6 +702,12 @@ dependencies = [ "criterion", ] +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + [[package]] name = "colored" version = "2.0.4" @@ -1281,6 +1336,29 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1880,6 +1958,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -2062,6 +2146,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.10.5" @@ -2286,9 +2376,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.19" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lru" @@ -3924,12 +4014,14 @@ dependencies = [ "async-trait", "colored", "enumflags2", + "env_logger", "hyper", "indexmap 2.2.2", "indoc 2.0.3", "insta", "itertools 0.12.0", "jsonrpc-core", + "log", "nom", "once_cell", "parse-hyperlinks", diff --git a/query-engine/connector-test-kit-rs/README.md b/query-engine/connector-test-kit-rs/README.md index ef8396f48045..440445cf5398 100644 --- a/query-engine/connector-test-kit-rs/README.md +++ b/query-engine/connector-test-kit-rs/README.md @@ -89,7 +89,7 @@ To run tests through a driver adapters, you should also configure the following Example: ```shell -export EXTERNAL_TEST_EXECUTOR="$WORKSPACE_ROOT/query-engine/driver-adapters/executor/script/testd.sh" +export EXTERNAL_TEST_EXECUTOR="$WORKSPACE_ROOT/query-engine/driver-adapters/executor/script/testd-qe.sh" export DRIVER_ADAPTER=neon export ENGINE=wasm export DRIVER_ADAPTER_CONFIG ='{ "proxyUrl": "127.0.0.1:5488/v1" }' diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml b/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml index b2016e602b9c..6f564236fd87 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml +++ b/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml @@ -40,3 +40,5 @@ insta = "1.7.1" # as this is a small crate with little user base. parse-hyperlinks = "0.23.3" strip-ansi-escapes = "0.1.1" +log = "0.4.22" +env_logger = "0.11.6" diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/config.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/config.rs index f287f3e4782b..df4ac0a32437 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/config.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/config.rs @@ -2,6 +2,7 @@ use crate::{ CockroachDbConnectorTag, ConnectorTag, ConnectorVersion, MongoDbConnectorTag, MySqlConnectorTag, PostgresConnectorTag, SqlServerConnectorTag, SqliteConnectorTag, TestResult, VitessConnectorTag, }; +use log::warn; use qe_setup::driver_adapters::DriverAdapter; use serde::{Deserialize, Serialize}; use std::{convert::TryFrom, env, fmt::Display, fs::File, io::Read, path::PathBuf}; @@ -258,6 +259,8 @@ fn exit_with_message(msg: &str) -> ! { impl TestConfig { /// Loads a configuration. File-based config has precedence over env config. pub(crate) fn load() -> Self { + // Use `RUST_LOG=warn` to see warnings about config loading. + env_logger::init(); let config = match Self::from_file().or_else(Self::from_env) { Some(config) => config, None => exit_with_message(CONFIG_LOAD_FAILED), @@ -326,21 +329,35 @@ impl TestConfig { } fn from_file() -> Option { - let current_dir = env::current_dir().ok(); - current_dir - .and_then(|path| Self::try_path(config_path(path))) - .or_else(|| Self::workspace_root().and_then(|path| Self::try_path(config_path(path)))) + Self::workspace_root().and_then(|path| Self::try_path(config_path(path))) } fn try_path(path: PathBuf) -> Option { - File::open(path).ok().and_then(|mut f| { - let mut config = String::new(); - - f.read_to_string(&mut config) - .ok() - .and_then(|_| serde_json::from_str::(&config).ok()) - .map(Self::from) - }) + File::open(&path) + .map_err(move |err| { + warn!("Could not open file {}: {err}", path.display()); + err + }) + .ok() + .and_then(|mut f| { + let mut config = String::new(); + + f.read_to_string(&mut config) + .map_err(|err| { + warn!("Could not read file into string: {err}"); + err + }) + .ok() + .and_then(|_| { + serde_json::from_str::(&config) + .map_err(|err| { + warn!("Could not deserialize JSON via TestConfigFromSerde: {err}"); + err + }) + .ok() + }) + .map(Self::from) + }) } fn workspace_root() -> Option { @@ -348,7 +365,7 @@ impl TestConfig { } pub fn external_test_executor_path(&self) -> Option { - const DEFAULT_TEST_EXECUTOR: &str = "query-engine/driver-adapters/executor/script/testd.sh"; + const DEFAULT_TEST_EXECUTOR: &str = "query-engine/driver-adapters/executor/script/testd-qe.sh"; self.with_driver_adapter() .and_then(|_| { Self::workspace_root().or_else(|| { diff --git a/query-engine/driver-adapters/executor/package.json b/query-engine/driver-adapters/executor/package.json index 5d770fb484f0..016a5f0a6a59 100644 --- a/query-engine/driver-adapters/executor/package.json +++ b/query-engine/driver-adapters/executor/package.json @@ -8,8 +8,8 @@ "description": "", "private": true, "scripts": { - "build": "tsup ./src/testd.ts ./src/bench.ts --format esm --dts", - "test": "node --import tsx ./src/testd.ts", + "build": "tsup ./src/testd-qe.ts ./src/bench.ts --format esm --dts", + "test:qe": "node --import tsx ./src/testd-qe.ts", "clean:d1": "rm -rf ../../connector-test-kit-rs/query-engine-tests/.wrangler" }, "tsup": { diff --git a/query-engine/driver-adapters/executor/script/testd-qe.sh b/query-engine/driver-adapters/executor/script/testd-qe.sh new file mode 100755 index 000000000000..a5b1a27faa96 --- /dev/null +++ b/query-engine/driver-adapters/executor/script/testd-qe.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +node "$(dirname "${BASH_SOURCE[0]}")/../dist/testd-qe.mjs" diff --git a/query-engine/driver-adapters/executor/script/testd.sh b/query-engine/driver-adapters/executor/script/testd.sh deleted file mode 100755 index b61fb5deb981..000000000000 --- a/query-engine/driver-adapters/executor/script/testd.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env bash -node "$(dirname "${BASH_SOURCE[0]}")/../dist/testd.mjs" diff --git a/query-engine/driver-adapters/executor/src/bench.ts b/query-engine/driver-adapters/executor/src/bench.ts index 7aaccc20d488..7112ed5b8002 100644 --- a/query-engine/driver-adapters/executor/src/bench.ts +++ b/query-engine/driver-adapters/executor/src/bench.ts @@ -1,244 +1,269 @@ -/** - * Run with: `node --experimental-wasm-modules ./example.js` - * on Node.js 18+. - */ -import { webcrypto } from "node:crypto"; -import * as fs from "node:fs/promises"; -import path from "node:path"; +import * as fs from 'node:fs/promises' +import path from 'node:path' import { __dirname } from './utils' -import * as qe from "./qe"; +import * as qe from './query-engine' -import { pg } from "@prisma/bundled-js-drivers"; -import * as prismaPg from "@prisma/adapter-pg"; -import { bindAdapter, DriverAdapter } from "@prisma/driver-adapter-utils"; +import { pg } from '@prisma/bundled-js-drivers' +import * as prismaPg from '@prisma/adapter-pg' +import { bindAdapter, DriverAdapter } from '@prisma/driver-adapter-utils' -import { recording } from "./recording"; -import { nextRequestId } from "./requestId"; -import prismaQueries from "../bench/queries.json"; +import { recording } from './recording' +import { nextRequestId } from './requestId' +import prismaQueries from '../bench/queries.json' -import { run, bench, group, baseline } from "mitata"; +import { baseline, bench, group, run } from 'mitata' -import { QueryEngine as WasmBaseline } from "query-engine-wasm-baseline"; +import { QueryEngine as WasmBaseline } from 'query-engine-wasm-baseline' // `query-engine-wasm-latest` refers to the latest published version of the Wasm Query Engine, // rather than the latest locally built one. We're pulling in the Postgres Query Engine // because benchmarks are only run against a Postgres database. -import { QueryEngine as WasmLatest } from "query-engine-wasm-latest/postgresql/query_engine.js"; - -if (!global.crypto) { - (global as any).crypto = webcrypto; -} +import { QueryEngine as WasmLatest } from 'query-engine-wasm-latest/postgresql/query_engine.js' async function main(): Promise { // read the prisma schema from stdin var datamodel = ( - await fs.readFile(path.resolve(__dirname, "..", "bench", "schema.prisma")) - ).toString(); + await fs.readFile(path.resolve(__dirname, '..', 'bench', 'schema.prisma')) + ).toString() - const url = process.env.DATABASE_URL; + const url = process.env.DATABASE_URL if (url == null) { - throw new Error("DATABASE_URL is not defined"); + throw new Error('DATABASE_URL is not defined') } - const pg = await pgAdapter(url); - const withErrorCapturing = bindAdapter(pg); + const pg = await pgAdapter(url) + const withErrorCapturing = bindAdapter(pg) // We build two decorators for recording and replaying db queries. - const { recorder, replayer, recordings } = recording(withErrorCapturing); + const { recorder, replayer, recordings } = recording(withErrorCapturing) // We exercise the queries recording them - await recordQueries(recorder, datamodel, prismaQueries); + await recordQueries(recorder, datamodel, prismaQueries) // Dump recordings if requested if (process.env.BENCH_RECORDINGS_FILE != null) { - const recordingsJson = JSON.stringify(recordings.data(), null, 2); - await fs.writeFile(process.env.BENCH_RECORDINGS_FILE, recordingsJson); - debug(`Recordings written to ${process.env.BENCH_RECORDINGS_FILE}`); + const recordingsJson = JSON.stringify(recordings.data(), null, 2) + await fs.writeFile(process.env.BENCH_RECORDINGS_FILE, recordingsJson) + debug(`Recordings written to ${process.env.BENCH_RECORDINGS_FILE}`) } // Then we benchmark the execution of the queries but instead of hitting the DB // we fetch results from the recordings, thus isolating the performance // of the engine + driver adapter code from that of the DB IO. - await benchMarkQueries(replayer, datamodel, prismaQueries); + await benchMarkQueries(replayer, datamodel, prismaQueries) } async function recordQueries( adapter: DriverAdapter, datamodel: string, - prismaQueries: any + prismaQueries: any, ): Promise { // Different engines might have made different SQL queries to complete the same Prisma Query, // so we record the results of all engines for the benchmarking phase. - const napi = await initQeNapiCurrent(adapter, datamodel); - await napi.connect("", nextRequestId()); - const wasmCurrent = await initQeWasmCurrent(adapter, datamodel); - await wasmCurrent.connect("", nextRequestId()); - const wasmBaseline = await initQeWasmBaseLine(adapter, datamodel); - await wasmBaseline.connect("", nextRequestId()); - const wasmLatest = await initQeWasmLatest(adapter, datamodel); - await wasmLatest.connect("", nextRequestId()); + const napi = await initQeNapiCurrent(adapter, datamodel) + await napi.connect('', nextRequestId()) + const wasmCurrent = await initQeWasmCurrent(adapter, datamodel) + await wasmCurrent.connect('', nextRequestId()) + const wasmBaseline = await initQeWasmBaseLine(adapter, datamodel) + await wasmBaseline.connect('', nextRequestId()) + const wasmLatest = await initQeWasmLatest(adapter, datamodel) + await wasmLatest.connect('', nextRequestId()) try { for (const qe of [napi, wasmCurrent, wasmBaseline, wasmLatest]) { for (const prismaQuery of prismaQueries) { - const { description, query } = prismaQuery; - const res = await qe.query(JSON.stringify(query), "", undefined, nextRequestId()); - console.log(res[9]); - - const errors = JSON.parse(res).errors; + const { description, query } = prismaQuery + const res = await qe.query( + JSON.stringify(query), + '', + undefined, + nextRequestId(), + ) + console.log(res[9]) + + const errors = JSON.parse(res).errors if (errors != null) { throw new Error( - `Query failed for ${description}: ${JSON.stringify(res)}` - ); + `Query failed for ${description}: ${JSON.stringify(res)}`, + ) } } } } finally { - await napi.disconnect("", nextRequestId()); - await wasmCurrent.disconnect("", nextRequestId()); - await wasmBaseline.disconnect("", nextRequestId()); - await wasmLatest.disconnect("", nextRequestId()); + await napi.disconnect('', nextRequestId()) + await wasmCurrent.disconnect('', nextRequestId()) + await wasmBaseline.disconnect('', nextRequestId()) + await wasmLatest.disconnect('', nextRequestId()) } } async function benchMarkQueries( adapter: DriverAdapter, datamodel: string, - prismaQueries: any + prismaQueries: any, ) { - const napi = await initQeNapiCurrent(adapter, datamodel); - await napi.connect("", nextRequestId()); - const wasmCurrent = await initQeWasmCurrent(adapter, datamodel); - await wasmCurrent.connect("", nextRequestId()); - const wasmBaseline = await initQeWasmBaseLine(adapter, datamodel); - await wasmBaseline.connect("", nextRequestId()); - const wasmLatest = await initQeWasmLatest(adapter, datamodel); - await wasmLatest.connect("", nextRequestId()); + const napi = await initQeNapiCurrent(adapter, datamodel) + await napi.connect('', nextRequestId()) + const wasmCurrent = await initQeWasmCurrent(adapter, datamodel) + await wasmCurrent.connect('', nextRequestId()) + const wasmBaseline = await initQeWasmBaseLine(adapter, datamodel) + await wasmBaseline.connect('', nextRequestId()) + const wasmLatest = await initQeWasmLatest(adapter, datamodel) + await wasmLatest.connect('', nextRequestId()) for (const prismaQuery of prismaQueries) { - const { description, query } = prismaQuery; + const { description, query } = prismaQuery const engines = { Napi: napi, - "WASM Current": wasmCurrent, - "WASM Baseline": wasmBaseline, - "WASM Latest": wasmLatest, - }; + 'WASM Current': wasmCurrent, + 'WASM Baseline': wasmBaseline, + 'WASM Latest': wasmLatest, + } for (const [engineName, engine] of Object.entries(engines)) { - const res = await engine.query(JSON.stringify(query), "", undefined, nextRequestId()); - const errors = JSON.parse(res).errors; + const res = await engine.query( + JSON.stringify(query), + '', + undefined, + nextRequestId(), + ) + const errors = JSON.parse(res).errors if (errors != null && errors.length > 0) { throw new Error( - `${engineName} - Query failed for ${description}: ${JSON.stringify( - res - )}` - ); + `${engineName} - Query failed for ${description}: ${ + JSON.stringify( + res, + ) + }`, + ) } } } try { for (const prismaQuery of prismaQueries) { - const { description, query } = prismaQuery; - const jsonQuery = JSON.stringify(query); - const irrelevantTraceId = ""; - const noTx = undefined; + const { description, query } = prismaQuery + const jsonQuery = JSON.stringify(query) + const irrelevantTraceId = '' + const noTx = undefined group(description, () => { bench( - "Web Assembly: Baseline", + 'Web Assembly: Baseline', async () => - await wasmBaseline.query(jsonQuery, irrelevantTraceId, noTx, nextRequestId()) - ); + await wasmBaseline.query( + jsonQuery, + irrelevantTraceId, + noTx, + nextRequestId(), + ), + ) bench( - "Web Assembly: Latest", - async () => await wasmLatest.query(jsonQuery, irrelevantTraceId, noTx, nextRequestId()) - ); + 'Web Assembly: Latest', + async () => + await wasmLatest.query( + jsonQuery, + irrelevantTraceId, + noTx, + nextRequestId(), + ), + ) baseline( - "Web Assembly: Current", + 'Web Assembly: Current', async () => - await wasmCurrent.query(jsonQuery, irrelevantTraceId, noTx, nextRequestId()) - ); + await wasmCurrent.query( + jsonQuery, + irrelevantTraceId, + noTx, + nextRequestId(), + ), + ) bench( - "Node API: Current", - async () => await napi.query(jsonQuery, irrelevantTraceId, noTx, nextRequestId()) - ); - }); + 'Node API: Current', + async () => + await napi.query( + jsonQuery, + irrelevantTraceId, + noTx, + nextRequestId(), + ), + ) + }) } await run({ colors: false, collect: true, - }); + }) } finally { - await napi.disconnect("", nextRequestId()); - await wasmCurrent.disconnect("", nextRequestId()); - await wasmBaseline.disconnect("", nextRequestId()); - await wasmLatest.disconnect("", nextRequestId()); + await napi.disconnect('', nextRequestId()) + await wasmCurrent.disconnect('', nextRequestId()) + await wasmBaseline.disconnect('', nextRequestId()) + await wasmLatest.disconnect('', nextRequestId()) } } // conditional debug logging based on LOG_LEVEL env var const debug = (() => { - if ((process.env.LOG_LEVEL ?? "").toLowerCase() != "debug") { - return (...args: any[]) => {}; + if ((process.env.LOG_LEVEL ?? '').toLowerCase() != 'debug') { + return (...args: any[]) => {} } return (...args: any[]) => { - console.error("[nodejs] DEBUG:", ...args); - }; -})(); + console.error('[nodejs] DEBUG:', ...args) + } +})() async function pgAdapter(url: string): Promise { - const schemaName = new URL(url).searchParams.get("schema") ?? undefined; - let args: any = { connectionString: url }; + const schemaName = new URL(url).searchParams.get('schema') ?? undefined + let args: any = { connectionString: url } if (schemaName != null) { - args.options = `--search_path="${schemaName}"`; + args.options = `--search_path="${schemaName}"` } - const pool = new pg.Pool(args); + const pool = new pg.Pool(args) return new prismaPg.PrismaPg(pool, { schema: schemaName, - }); + }) } async function initQeNapiCurrent( adapter: DriverAdapter, - datamodel: string + datamodel: string, ): Promise { - return await qe.initQueryEngine("Napi", adapter, datamodel, debug, debug); + return await qe.initQueryEngine('Napi', adapter, datamodel, debug, debug) } async function initQeWasmCurrent( adapter: DriverAdapter, - datamodel: string + datamodel: string, ): Promise { return await qe.initQueryEngine( - "Wasm", + 'Wasm', adapter, datamodel, (...args) => {}, - debug - ); + debug, + ) } async function initQeWasmLatest( adapter: DriverAdapter, - datamodel: string + datamodel: string, ): Promise { - return new WasmLatest(qe.queryEngineOptions(datamodel), debug, adapter); + return new WasmLatest(qe.queryEngineOptions(datamodel), debug, adapter) } function initQeWasmBaseLine( adapter: DriverAdapter, - datamodel: string + datamodel: string, ): qe.QueryEngine { - return new WasmBaseline(qe.queryEngineOptions(datamodel), debug, adapter); + return new WasmBaseline(qe.queryEngineOptions(datamodel), debug, adapter) } main().catch((err) => { - console.error(err); - process.exit(1); -}); + console.error(err) + process.exit(1) +}) diff --git a/query-engine/driver-adapters/executor/src/driver-adapters-manager/neon.ws.ts b/query-engine/driver-adapters/executor/src/driver-adapters-manager/neon.ws.ts index 66cdfc6e5a42..0a8e3ddae849 100644 --- a/query-engine/driver-adapters/executor/src/driver-adapters-manager/neon.ws.ts +++ b/query-engine/driver-adapters/executor/src/driver-adapters-manager/neon.ws.ts @@ -2,7 +2,7 @@ import { PrismaNeon } from '@prisma/adapter-neon' import { neon } from '@prisma/bundled-js-drivers' import { DriverAdapter } from '@prisma/driver-adapter-utils' import { WebSocket } from 'ws' -import { postgresSchemaName, postgres_options } from '../utils' +import { postgresSchemaName, postgresOptions } from '../utils' import type { DriverAdaptersManager } from './index' import type { DriverAdapterTag, EnvForAdapter } from '../types' @@ -34,7 +34,7 @@ export class NeonWsManager implements DriverAdaptersManager { const schemaName = postgresSchemaName(url) - this.#driver = new Pool(postgres_options(url)) + this.#driver = new Pool(postgresOptions(url)) this.#adapter = new PrismaNeon(this.#driver, { schema: schemaName }) as DriverAdapter return this.#adapter diff --git a/query-engine/driver-adapters/executor/src/driver-adapters-manager/pg.ts b/query-engine/driver-adapters/executor/src/driver-adapters-manager/pg.ts index a3b28e119651..52c75b4526d1 100644 --- a/query-engine/driver-adapters/executor/src/driver-adapters-manager/pg.ts +++ b/query-engine/driver-adapters/executor/src/driver-adapters-manager/pg.ts @@ -1,7 +1,7 @@ import { PrismaPg } from '@prisma/adapter-pg' import { pg } from '@prisma/bundled-js-drivers' import { DriverAdapter } from '@prisma/driver-adapter-utils' -import { postgresSchemaName, postgres_options } from '../utils' +import { postgresSchemaName, postgresOptions } from '../utils' import type { ConnectParams, DriverAdaptersManager } from './index' import type { DriverAdapterTag, EnvForAdapter } from '../types' @@ -21,7 +21,7 @@ export class PgManager implements DriverAdaptersManager { async connect({ url }: ConnectParams) { const schemaName = postgresSchemaName(url) - this.#driver = new pg.Pool(postgres_options(url)) + this.#driver = new pg.Pool(postgresOptions(url)) this.#adapter = new PrismaPg(this.#driver, { schema: schemaName }) as DriverAdapter return this.#adapter diff --git a/query-engine/driver-adapters/executor/src/engines/Library.ts b/query-engine/driver-adapters/executor/src/engines/Library.ts index 7fd9d0ab7f76..7e13f0a406c3 100644 --- a/query-engine/driver-adapters/executor/src/engines/Library.ts +++ b/query-engine/driver-adapters/executor/src/engines/Library.ts @@ -23,7 +23,7 @@ export type QueryEngineInstance = { } export interface QueryEngineConstructor { - new(config: QueryEngineConfig, logger: (log: string) => void, nodejsFnCtx?: DriverAdapter): QueryEngineInstance + new (config: QueryEngineConfig, logger: (log: string) => void, nodejsFnCtx?: DriverAdapter): QueryEngineInstance } export interface LibraryLoader { diff --git a/query-engine/driver-adapters/executor/src/engines/QueryEngine.ts b/query-engine/driver-adapters/executor/src/engines/QueryEngine.ts index 416da634fc91..ed824b8c20df 100644 --- a/query-engine/driver-adapters/executor/src/engines/QueryEngine.ts +++ b/query-engine/driver-adapters/executor/src/engines/QueryEngine.ts @@ -5,36 +5,35 @@ import * as Transaction from './Transaction' export type QueryEngineEvent = QueryEngineLogEvent | QueryEngineQueryEvent | QueryEnginePanicEvent export type QueryEngineLogEvent = { - level: string - module_path: string - message: string - span?: boolean + level: string + module_path: string + message: string + span?: boolean } export type QueryEngineQueryEvent = { - level: 'info' - module_path: string - query: string - item_type: 'query' - params: string - duration_ms: string - result: string + level: 'info' + module_path: string + query: string + item_type: 'query' + params: string + duration_ms: string + result: string } export type QueryEnginePanicEvent = { - level: 'error' - module_path: string - message: 'PANIC' - reason: string - file: string - line: string - column: string + level: 'error' + module_path: string + message: 'PANIC' + reason: string + file: string + line: string + column: string } - export type GraphQLQuery = { - query: string - variables: object + query: string + variables: object } export type EngineProtocol = 'graphql' | 'json' @@ -43,47 +42,47 @@ export type EngineQuery = GraphQLQuery | JsonQuery export type EngineBatchQueries = GraphQLQuery[] | JsonQuery[] export type QueryEngineConfig = { - // TODO rename datamodel here and other places - datamodel: string - configDir: string - logQueries: boolean - ignoreEnvVarErrors: boolean - datasourceOverrides?: Record - env: Record - logLevel?: string - engineProtocol: EngineProtocol + // TODO rename datamodel here and other places + datamodel: string + configDir: string + logQueries: boolean + ignoreEnvVarErrors: boolean + datasourceOverrides?: Record + env: Record + logLevel?: string + engineProtocol: EngineProtocol } // Errors export type SyncRustError = { - is_panic: boolean - message: string - meta: { - full_error: string - } - error_code: string + is_panic: boolean + message: string + meta: { + full_error: string + } + error_code: string } export type RustRequestError = { - is_panic: boolean - message: string - backtrace: string + is_panic: boolean + message: string + backtrace: string } export type QueryEngineResult = { - data: T - elapsed: number + data: T + elapsed: number } export type QueryEngineBatchRequest = QueryEngineBatchGraphQLRequest | JsonBatchQuery export type QueryEngineBatchGraphQLRequest = { - batch: QueryEngineRequest[] - transaction?: boolean - isolationLevel?: Transaction.IsolationLevel + batch: QueryEngineRequest[] + transaction?: boolean + isolationLevel?: Transaction.IsolationLevel } export type QueryEngineRequest = { - query: string - variables: Object + query: string + variables: Object } diff --git a/query-engine/driver-adapters/executor/src/qe.ts b/query-engine/driver-adapters/executor/src/qe.ts deleted file mode 100644 index a3f385413e33..000000000000 --- a/query-engine/driver-adapters/executor/src/qe.ts +++ /dev/null @@ -1,76 +0,0 @@ -import type { DriverAdapter } from "@prisma/driver-adapter-utils"; -import * as napi from "./engines/Library"; -import * as os from "node:os"; -import * as path from "node:path"; -import { __dirname } from './utils' - -export interface QueryEngine { - connect(trace: string, requestId: string): Promise; - disconnect(trace: string, requestId: string): Promise; - query(body: string, trace: string, tx_id: string | undefined, requestId: string): Promise; - startTransaction(input: string, trace: string, requestId: string): Promise; - commitTransaction(tx_id: string, trace: string, requestId: string): Promise; - rollbackTransaction(tx_id: string, trace: string, requestId: string): Promise; -} - -export type QueryLogCallback = (log: string) => void; - -export async function initQueryEngine( - engineType: "Napi" | "Wasm", - adapter: DriverAdapter, - datamodel: string, - queryLogCallback: QueryLogCallback, - debug: (...args: any[]) => void -): Promise { - const logCallback = (event: any) => { - const parsed = JSON.parse(event); - if (parsed.is_query) { - queryLogCallback(parsed.query); - } - debug(parsed); - }; - - const options = queryEngineOptions(datamodel); - - if (engineType === "Wasm") { - const { getEngineForProvider } = await import("./wasm"); - const WasmQueryEngine = await getEngineForProvider(adapter.provider) - return new WasmQueryEngine(options, logCallback, adapter); - } else { - const { QueryEngine } = loadNapiEngine(); - return new QueryEngine(options, logCallback, adapter); - } -} - -export function queryEngineOptions(datamodel: string) { - return { - datamodel, - configDir: ".", - engineProtocol: "json" as const, - logLevel: process.env["RUST_LOG"] ?? ("info" as any), - logQueries: true, - env: process.env, - ignoreEnvVarErrors: false, - enableTracing: true, - }; -} - -function loadNapiEngine(): napi.Library { - // I assume nobody will run this on Windows ¯\_(ツ)_/¯ - const libExt = os.platform() === "darwin" ? "dylib" : "so"; - const target = - process.env.TARGET || process.env.PROFILE == "release" - ? "release" - : "debug"; - - const libQueryEnginePath = path.resolve( - __dirname, - `../../../../target/${target}/libquery_engine.${libExt}` - ); - - const libqueryEngine = { exports: {} as unknown as napi.Library }; - // @ts-ignore - process.dlopen(libqueryEngine, libQueryEnginePath); - - return libqueryEngine.exports; -} diff --git a/query-engine/driver-adapters/executor/src/query-engine-wasm.ts b/query-engine/driver-adapters/executor/src/query-engine-wasm.ts new file mode 100644 index 000000000000..b690de0a8650 --- /dev/null +++ b/query-engine/driver-adapters/executor/src/query-engine-wasm.ts @@ -0,0 +1,43 @@ +import * as wasmPostgres from '../../../query-engine-wasm/pkg/postgresql/query_engine_bg.js' +import * as wasmMysql from '../../../query-engine-wasm/pkg/mysql/query_engine_bg.js' +import * as wasmSqlite from '../../../query-engine-wasm/pkg/sqlite/query_engine_bg.js' +import fs from 'node:fs/promises' +import path from 'node:path' +import { __dirname } from './utils' + +const wasm = { + postgres: wasmPostgres, + mysql: wasmMysql, + sqlite: wasmSqlite, +} + +type EngineName = keyof typeof wasm + +const initializedModules = new Set() + +export async function getQueryEngineForProvider(provider: EngineName) { + const engine = wasm[provider] + if (!initializedModules.has(provider)) { + const subDir = provider === 'postgres' ? 'postgresql' : provider + const bytes = await fs.readFile( + path.resolve( + __dirname, + '..', + '..', + '..', + 'query-engine-wasm', + 'pkg', + subDir, + 'query_engine_bg.wasm', + ), + ) + const module = new WebAssembly.Module(bytes) + const instance = new WebAssembly.Instance(module, { + './query_engine_bg.js': engine, + }) + engine.__wbg_set_wasm(instance.exports) + initializedModules.add(provider) + } + + return engine.QueryEngine +} diff --git a/query-engine/driver-adapters/executor/src/query-engine.ts b/query-engine/driver-adapters/executor/src/query-engine.ts new file mode 100644 index 000000000000..b24e9d6178da --- /dev/null +++ b/query-engine/driver-adapters/executor/src/query-engine.ts @@ -0,0 +1,73 @@ +import type { DriverAdapter } from '@prisma/driver-adapter-utils' +import * as napi from './engines/Library' +import * as os from 'node:os' +import * as path from 'node:path' +import { __dirname } from './utils' + +export interface QueryEngine { + connect(trace: string, requestId: string): Promise + disconnect(trace: string, requestId: string): Promise + query(body: string, trace: string, tx_id: string | undefined, requestId: string): Promise + startTransaction(input: string, trace: string, requestId: string): Promise + commitTransaction(tx_id: string, trace: string, requestId: string): Promise + rollbackTransaction(tx_id: string, trace: string, requestId: string): Promise +} + +export type QueryLogCallback = (log: string) => void + +export async function initQueryEngine( + engineType: 'Napi' | 'Wasm', + adapter: DriverAdapter, + datamodel: string, + queryLogCallback: QueryLogCallback, + debug: (...args: any[]) => void, +): Promise { + const logCallback = (event: any) => { + const parsed = JSON.parse(event) + if (parsed.is_query) { + queryLogCallback(parsed.query) + } + debug(parsed) + } + + const options = queryEngineOptions(datamodel) + + if (engineType === 'Wasm') { + const { getQueryEngineForProvider: getEngineForProvider } = await import('./query-engine-wasm') + const WasmQueryEngine = await getEngineForProvider(adapter.provider) + return new WasmQueryEngine(options, logCallback, adapter) + } else { + const { QueryEngine } = loadNapiEngine() + return new QueryEngine(options, logCallback, adapter) + } +} + +export function queryEngineOptions(datamodel: string) { + return { + datamodel, + configDir: '.', + engineProtocol: 'json' as const, + logLevel: process.env['RUST_LOG'] ?? ('info' as any), + logQueries: true, + env: process.env, + ignoreEnvVarErrors: false, + enableTracing: true, + } +} + +function loadNapiEngine(): napi.Library { + // I assume nobody will run this on Windows ¯\_(ツ)_/¯ + const libExt = os.platform() === 'darwin' ? 'dylib' : 'so' + const target = process.env.TARGET || process.env.PROFILE == 'release' ? 'release' : 'debug' + + const libQueryEnginePath = path.resolve( + __dirname, + `../../../../target/${target}/libquery_engine.${libExt}`, + ) + + const libqueryEngine = { exports: {} as unknown as napi.Library } + // @ts-ignore + process.dlopen(libqueryEngine, libQueryEnginePath) + + return libqueryEngine.exports +} diff --git a/query-engine/driver-adapters/executor/src/recording.ts b/query-engine/driver-adapters/executor/src/recording.ts index 5ac0f52b4cb7..0349e17cef90 100644 --- a/query-engine/driver-adapters/executor/src/recording.ts +++ b/query-engine/driver-adapters/executor/src/recording.ts @@ -1,20 +1,15 @@ -import type { - DriverAdapter, - Query, - Result, - ResultSet, -} from "@prisma/driver-adapter-utils" +import type { DriverAdapter, Query, Result, ResultSet } from '@prisma/driver-adapter-utils' -type Recordings = ReturnType; +type Recordings = ReturnType export function recording(adapter: DriverAdapter) { - const recordings = createInMemoryRecordings(); + const recordings = createInMemoryRecordings() return { recorder: recorder(adapter, recordings), replayer: replayer(adapter, recordings), recordings: recordings, - }; + } } function recorder(adapter: DriverAdapter, recordings: Recordings) { @@ -22,18 +17,18 @@ function recorder(adapter: DriverAdapter, recordings: Recordings) { provider: adapter.provider, adapterName: adapter.adapterName, transactionContext: () => { - throw new Error("Not implemented"); + throw new Error('Not implemented') }, getConnectionInfo: () => { - return adapter.getConnectionInfo!(); + return adapter.getConnectionInfo!() }, queryRaw: async (params) => { - const result = await adapter.queryRaw(params); - recordings.addQueryResults(params, result); - return result; + const result = await adapter.queryRaw(params) + recordings.addQueryResults(params, result) + return result }, executeRaw: async (params) => { - throw new Error("Not implemented"); + throw new Error('Not implemented') }, } satisfies DriverAdapter } @@ -44,71 +39,71 @@ function replayer(adapter: DriverAdapter, recordings: Recordings) { adapterName: adapter.adapterName, recordings: recordings, transactionContext: () => { - throw new Error("Not implemented"); + throw new Error('Not implemented') }, getConnectionInfo: () => { - return adapter.getConnectionInfo!(); + return adapter.getConnectionInfo!() }, queryRaw: async (params) => { - return recordings.getQueryResults(params); + return recordings.getQueryResults(params) }, executeRaw: async (params) => { - return recordings.getCommandResults(params); + return recordings.getCommandResults(params) }, } satisfies DriverAdapter & { recordings: Recordings } } function createInMemoryRecordings() { - const queryResults: Map> = new Map(); - const commandResults: Map> = new Map(); + const queryResults: Map> = new Map() + const commandResults: Map> = new Map() const queryToKey = (params: Query) => { - var sql = params.sql; + var sql = params.sql params.args.forEach((arg: any, i) => { - sql = sql.replace("$" + (i + 1), arg.toString()); - }); - return sql; - }; + sql = sql.replace('$' + (i + 1), arg.toString()) + }) + return sql + } return { data: (): Map => { - const map = new Map(); + const map = new Map() for (const [key, value] of queryResults.entries()) { value.map((resultSet) => { - map[key] = resultSet; - }); + map[key] = resultSet + }) } - return map; + return map }, addQueryResults: (params: Query, result: Result) => { - const key = queryToKey(params); - queryResults.set(key, result); + const key = queryToKey(params) + queryResults.set(key, result) }, getQueryResults: (params: Query) => { - const key = queryToKey(params); + const key = queryToKey(params) if (!queryResults.has(key)) { - throw new Error(`Query not recorded: ${key}`); + throw new Error(`Query not recorded: ${key}`) } - return queryResults.get(key)!; + return queryResults.get(key)! }, addCommandResults: (params: Query, result: Result) => { - const key = queryToKey(params); - commandResults.set(key, result); + const key = queryToKey(params) + commandResults.set(key, result) }, getCommandResults: (params: Query) => { - const key = queryToKey(params); - + const key = queryToKey(params) + if (!commandResults.has(key)) { - throw new Error(`Command not recorded: ${key}`); + throw new Error(`Command not recorded: ${key}`) } - return commandResults.get(key)!; + return commandResults.get(key)! }, - }; + } } diff --git a/query-engine/driver-adapters/executor/src/requestId.ts b/query-engine/driver-adapters/executor/src/requestId.ts index bb88a87a7c4e..8fff8d1d2a62 100644 --- a/query-engine/driver-adapters/executor/src/requestId.ts +++ b/query-engine/driver-adapters/executor/src/requestId.ts @@ -1,11 +1,11 @@ -let NEXT_REQUEST_ID = 1n; -const MAX_REQUEST_ID = 0xffffffffffffffffn; +let NEXT_REQUEST_ID = 1n +const MAX_REQUEST_ID = 0xffffffffffffffffn export function nextRequestId(): string { - const id = NEXT_REQUEST_ID.toString(); - NEXT_REQUEST_ID++; - if (NEXT_REQUEST_ID > MAX_REQUEST_ID) { - NEXT_REQUEST_ID = 1n; - } - return id; + const id = NEXT_REQUEST_ID.toString() + NEXT_REQUEST_ID++ + if (NEXT_REQUEST_ID > MAX_REQUEST_ID) { + NEXT_REQUEST_ID = 1n + } + return id } diff --git a/query-engine/driver-adapters/executor/src/rn.ts b/query-engine/driver-adapters/executor/src/rn.ts index d3c62b3e40e7..8ef3a7ec511e 100644 --- a/query-engine/driver-adapters/executor/src/rn.ts +++ b/query-engine/driver-adapters/executor/src/rn.ts @@ -1,96 +1,96 @@ export function createRNEngineConnector( url: string, schema: string, - logCallback: (msg: string) => void + logCallback: (msg: string) => void, ) { const headers = { - "Content-Type": "application/json", - Accept: "application/json", - }; + 'Content-Type': 'application/json', + Accept: 'application/json', + } return { connect: async () => { const res = await fetch(`${url}/connect`, { - method: "POST", - mode: "no-cors", + method: 'POST', + mode: 'no-cors', headers, body: JSON.stringify({ schema }), - }); + }) - return await res.json(); + return await res.json() }, query: async ( body: string, trace: string, - txId: string + txId: string, ): Promise => { const res = await fetch(`${url}/query`, { - method: "POST", - mode: "no-cors", + method: 'POST', + mode: 'no-cors', headers, body: JSON.stringify({ body, trace, txId, }), - }); + }) - const response = await res.json(); + const response = await res.json() if (response.logs.length) { - response.logs.forEach(logCallback); + response.logs.forEach(logCallback) } - return response.engineResponse; + return response.engineResponse }, startTransaction: async (body: string, trace: string): Promise => { const res = await fetch(`${url}/start_transaction`, { - method: "POST", - mode: "no-cors", + method: 'POST', + mode: 'no-cors', headers, body: JSON.stringify({ body, trace, }), - }); - return await res.json(); + }) + return await res.json() }, commitTransaction: async (txId: string, trace: string): Promise => { const res = await fetch(`${url}/commit_transaction`, { - method: "POST", - mode: "no-cors", + method: 'POST', + mode: 'no-cors', headers, body: JSON.stringify({ txId, trace, }), - }); - return res.json(); + }) + return res.json() }, rollbackTransaction: async ( txId: string, - trace: string + trace: string, ): Promise => { const res = await fetch(`${url}/rollback_transaction`, { - method: "POST", - mode: "no-cors", + method: 'POST', + mode: 'no-cors', headers, body: JSON.stringify({ txId, trace, }), - }); - return res.json(); + }) + return res.json() }, disconnect: async (trace: string) => { await fetch(`${url}/disconnect`, { - method: "POST", - mode: "no-cors", + method: 'POST', + mode: 'no-cors', headers, body: JSON.stringify({ trace, }), - }); + }) }, - }; + } } diff --git a/query-engine/driver-adapters/executor/src/setup.ts b/query-engine/driver-adapters/executor/src/setup.ts new file mode 100644 index 000000000000..e79d1f0b1d0c --- /dev/null +++ b/query-engine/driver-adapters/executor/src/setup.ts @@ -0,0 +1,33 @@ +import { match } from 'ts-pattern' +import { type DriverAdaptersManager } from './driver-adapters-manager' +import type { Env } from './types' +import { PgManager } from './driver-adapters-manager/pg' +import { NeonWsManager } from './driver-adapters-manager/neon.ws' +import { LibSQLManager } from './driver-adapters-manager/libsql' +import { PlanetScaleManager } from './driver-adapters-manager/planetscale' +import { D1Manager } from './driver-adapters-manager/d1' + +export async function setupDriverAdaptersManager( + env: Env, + migrationScript?: string, +): Promise { + return match(env) + .with({ DRIVER_ADAPTER: 'pg' }, async (env) => await PgManager.setup(env)) + .with( + { DRIVER_ADAPTER: 'neon:ws' }, + async (env) => await NeonWsManager.setup(env), + ) + .with( + { DRIVER_ADAPTER: 'libsql' }, + async (env) => await LibSQLManager.setup(env), + ) + .with( + { DRIVER_ADAPTER: 'planetscale' }, + async (env) => await PlanetScaleManager.setup(env), + ) + .with( + { DRIVER_ADAPTER: 'd1' }, + async (env) => await D1Manager.setup(env, migrationScript), + ) + .exhaustive() +} diff --git a/query-engine/driver-adapters/executor/src/testd-qe.ts b/query-engine/driver-adapters/executor/src/testd-qe.ts new file mode 100644 index 000000000000..3095495b558a --- /dev/null +++ b/query-engine/driver-adapters/executor/src/testd-qe.ts @@ -0,0 +1,243 @@ +import * as readline from 'node:readline' +import * as S from '@effect/schema/Schema' +import { bindAdapter, ErrorCapturingDriverAdapter } from '@prisma/driver-adapter-utils' + +import type { DriverAdaptersManager } from './driver-adapters-manager' +import { Env, jsonRpc } from './types' +import * as qe from './query-engine' +import { nextRequestId } from './requestId' +import { createRNEngineConnector } from './rn' +import { debug, err } from './utils' +import { setupDriverAdaptersManager } from './setup' +import { SchemaId } from './types/jsonRpc' + +async function main(): Promise { + const env = S.decodeUnknownSync(Env)(process.env) + console.log('[env]', env) + + const iface = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false, + }) + + iface.on('line', async (line) => { + try { + const request = S.decodeSync(jsonRpc.RequestFromString)(line) + debug(`Got a request: ${line}`) + + try { + const response = await handleRequest(request, env) + respondOk(request.id, response) + } catch (err) { + debug('[nodejs] Error from request handler: ', err) + respondErr(request.id, { + code: 1, + message: err.stack ?? err.toString(), + }) + } + } catch (err) { + debug('Received non-json line: ', line) + console.error(err) + } + }) +} + +const state: Record< + SchemaId, + { + engine: qe.QueryEngine + driverAdapterManager: DriverAdaptersManager + adapter: ErrorCapturingDriverAdapter | null + logs: string[] + } +> = {} + +async function handleRequest( + { method, params }: jsonRpc.Request, + env: Env, +): Promise { + if (method !== 'initializeSchema') { + if (state[params.schemaId] === undefined) { + throw new Error(`Schema with id ${params.schemaId} is not initialized. Please call 'initializeSchema' first.`) + } + } + + switch (method) { + case 'initializeSchema': { + debug('Got `initializeSchema', params) + + const { url, schema, schemaId, migrationScript } = params + const logs = [] as string[] + + const logCallback = (log) => { + logs.push(log) + } + + const driverAdapterManager = await setupDriverAdaptersManager( + env, + migrationScript, + ) + + const { engine, adapter } = await initQe({ + env, + url, + driverAdapterManager, + schema, + logCallback, + }) + await engine.connect('', nextRequestId()) + + state[schemaId] = { + engine, + driverAdapterManager, + adapter, + logs, + } + + if (adapter && adapter.getConnectionInfo) { + const maxBindValuesResult = adapter.getConnectionInfo().map((info) => info.maxBindValues) + if (maxBindValuesResult.ok) { + return { maxBindValues: maxBindValuesResult.value } + } + } + + return { maxBindValues: null } + } + case 'query': { + debug('Got `query`', params) + const { query, schemaId, txId } = params + const engine = state[schemaId].engine + const result = await engine.query(JSON.stringify(query), '', txId ?? undefined, nextRequestId()) + + const parsedResult = JSON.parse(result) + if (parsedResult.errors) { + const error = parsedResult.errors[0]?.user_facing_error + if (error.error_code === 'P2036') { + const jsError = state[schemaId].adapter?.errorRegistry.consumeError( + error.meta.id, + ) + if (!jsError) { + err( + `Something went wrong. Engine reported external error with id ${error.meta.id}, but it was not registered.`, + ) + } else { + err( + 'got error response from the engine caused by the driver: ', + jsError, + ) + } + } + } + + debug('🟢 Engine response: ', result) + // returning unparsed string: otherwise, some information gots lost during this round-trip. + // In particular, floating point without decimal part turn into integers + return result + } + + case 'startTx': { + debug('Got `startTx', params) + const { schemaId, options } = params + const result = await state[schemaId].engine.startTransaction( + JSON.stringify(options), + '', + nextRequestId(), + ) + return JSON.parse(result) + } + + case 'commitTx': { + debug('Got `commitTx', params) + const { schemaId, txId } = params + const result = await state[schemaId].engine.commitTransaction(txId, '{}', nextRequestId()) + return JSON.parse(result) + } + + case 'rollbackTx': { + debug('Got `rollbackTx', params) + const { schemaId, txId } = params + const result = await state[schemaId].engine.rollbackTransaction( + txId, + '{}', + nextRequestId(), + ) + return JSON.parse(result) + } + case 'teardown': { + debug('Got `teardown', params) + const { schemaId } = params + + await state[schemaId].engine.disconnect('', nextRequestId()) + await state[schemaId].driverAdapterManager.teardown() + delete state[schemaId] + + return {} + } + case 'getLogs': { + const { schemaId } = params + return state[schemaId].logs + } + default: { + throw new Error(`Unknown method: \`${method}\``) + } + } +} + +function respondErr(requestId: number, error: jsonRpc.RpcError) { + const msg: jsonRpc.ErrResponse = { + jsonrpc: '2.0', + id: requestId, + error, + } + console.log(JSON.stringify(msg)) +} + +function respondOk(requestId: number, payload: unknown) { + const msg: jsonRpc.OkResponse = { + jsonrpc: '2.0', + id: requestId, + result: payload, + } + console.log(JSON.stringify(msg)) +} + +type InitQueryEngineParams = { + env: Env + driverAdapterManager: DriverAdaptersManager + url: string + schema: string + logCallback: qe.QueryLogCallback +} + +async function initQe({ + env, + driverAdapterManager, + url, + schema, + logCallback, +}: InitQueryEngineParams) { + if (env.EXTERNAL_TEST_EXECUTOR === 'Mobile') { + url = env.MOBILE_EMULATOR_URL + + const engine = createRNEngineConnector(url, schema, logCallback) + return { engine, adapter: null } + } + + const adapter = await driverAdapterManager.connect({ url }) + const errorCapturingAdapter = bindAdapter(adapter) + const engineInstance = await qe.initQueryEngine( + env.EXTERNAL_TEST_EXECUTOR, + errorCapturingAdapter, + schema, + logCallback, + debug, + ) + + return { + engine: engineInstance, + adapter: errorCapturingAdapter, + } +} + +main().catch(err) diff --git a/query-engine/driver-adapters/executor/src/testd.ts b/query-engine/driver-adapters/executor/src/testd.ts deleted file mode 100644 index 6a5a3665da25..000000000000 --- a/query-engine/driver-adapters/executor/src/testd.ts +++ /dev/null @@ -1,285 +0,0 @@ -import * as readline from "node:readline"; -import { match } from "ts-pattern"; -import * as S from "@effect/schema/Schema"; -import { - bindAdapter, - ErrorCapturingDriverAdapter, -} from "@prisma/driver-adapter-utils"; -import { webcrypto } from "node:crypto"; - -import type { DriverAdaptersManager } from "./driver-adapters-manager"; -import { jsonRpc, Env } from "./types"; -import * as qe from "./qe"; -import { PgManager } from "./driver-adapters-manager/pg"; -import { NeonWsManager } from "./driver-adapters-manager/neon.ws"; -import { LibSQLManager } from "./driver-adapters-manager/libsql"; -import { PlanetScaleManager } from "./driver-adapters-manager/planetscale"; -import { D1Manager } from "./driver-adapters-manager/d1"; -import { nextRequestId } from "./requestId"; -import { createRNEngineConnector } from "./rn"; - -if (!global.crypto) { - global.crypto = webcrypto as Crypto; -} - -async function initialiseDriverAdapterManager( - env: Env, - migrationScript?: string -): Promise { - return match(env) - .with({ DRIVER_ADAPTER: "pg" }, async (env) => await PgManager.setup(env)) - .with( - { DRIVER_ADAPTER: "neon:ws" }, - async (env) => await NeonWsManager.setup(env) - ) - .with( - { DRIVER_ADAPTER: "libsql" }, - async (env) => await LibSQLManager.setup(env) - ) - .with( - { DRIVER_ADAPTER: "planetscale" }, - async (env) => await PlanetScaleManager.setup(env) - ) - .with( - { DRIVER_ADAPTER: "d1" }, - async (env) => await D1Manager.setup(env, migrationScript) - ) - .exhaustive(); -} - -// conditional debug logging based on LOG_LEVEL env var -const debug = (() => { - if ((process.env.LOG_LEVEL ?? "").toLowerCase() != "debug") { - return (...args: any[]) => {}; - } - - return (...args: any[]) => { - console.error("[nodejs] DEBUG:", ...args); - }; -})(); - -// error logger -const err = (...args: any[]) => console.error("[nodejs] ERROR:", ...args); - -async function main(): Promise { - const env = S.decodeUnknownSync(Env)(process.env); - console.log("[env]", env); - - const iface = readline.createInterface({ - input: process.stdin, - output: process.stdout, - terminal: false, - }); - - iface.on("line", async (line) => { - try { - const request = S.decodeSync(jsonRpc.RequestFromString)(line); - debug(`Got a request: ${line}`); - - try { - const response = await handleRequest(request, env); - respondOk(request.id, response); - } catch (err) { - debug("[nodejs] Error from request handler: ", err); - respondErr(request.id, { - code: 1, - message: err.stack ?? err.toString(), - }); - } - } catch (err) { - debug("Received non-json line: ", line); - console.error(err); - } - }); -} - -const state: Record< - number, - { - engine: qe.QueryEngine; - driverAdapterManager: DriverAdaptersManager; - adapter: ErrorCapturingDriverAdapter | null; - logs: string[]; - } -> = {}; - -async function handleRequest( - { method, params }: jsonRpc.Request, - env: Env -): Promise { - switch (method) { - case "initializeSchema": { - const { url, schema, schemaId, migrationScript } = params; - const logs = [] as string[]; - - const logCallback = (log) => { - logs.push(log); - }; - - const driverAdapterManager = await initialiseDriverAdapterManager( - env, - migrationScript - ); - - const { engine, adapter } = await initQe({ - env, - url, - driverAdapterManager, - schema, - logCallback, - }); - await engine.connect("", nextRequestId()); - - state[schemaId] = { - engine, - driverAdapterManager, - adapter, - logs, - }; - - if (adapter && adapter.getConnectionInfo) { - const maxBindValuesResult = adapter.getConnectionInfo().map(info => info.maxBindValues) - if (maxBindValuesResult.ok) { - return { maxBindValues: maxBindValuesResult.value } - } - } - - return { maxBindValues: null } - } - case "query": { - debug("Got `query`", params); - const { query, schemaId, txId } = params; - const engine = state[schemaId].engine; - const result = await engine.query(JSON.stringify(query), "", txId ?? undefined, nextRequestId()); - - const parsedResult = JSON.parse(result); - if (parsedResult.errors) { - const error = parsedResult.errors[0]?.user_facing_error; - if (error.error_code === "P2036") { - const jsError = state[schemaId].adapter?.errorRegistry.consumeError( - error.meta.id - ); - if (!jsError) { - err( - `Something went wrong. Engine reported external error with id ${error.meta.id}, but it was not registered.` - ); - } else { - err( - "got error response from the engine caused by the driver: ", - jsError - ); - } - } - } - - debug("🟢 Engine response: ", result); - // returning unparsed string: otherwise, some information gots lost during this round-trip. - // In particular, floating point without decimal part turn into integers - return result; - } - - case "startTx": { - debug("Got `startTx", params); - const { schemaId, options } = params; - const result = await state[schemaId].engine.startTransaction( - JSON.stringify(options), - "", - nextRequestId(), - ); - return JSON.parse(result); - } - - case "commitTx": { - debug("Got `commitTx", params); - const { schemaId, txId } = params; - const result = await state[schemaId].engine.commitTransaction(txId, "{}", nextRequestId()); - return JSON.parse(result); - } - - case "rollbackTx": { - debug("Got `rollbackTx", params); - const { schemaId, txId } = params; - const result = await state[schemaId].engine.rollbackTransaction( - txId, - "{}", - nextRequestId(), - ); - return JSON.parse(result); - } - case "teardown": { - debug("Got `teardown", params); - const { schemaId } = params; - - await state[schemaId].engine.disconnect("", nextRequestId()); - await state[schemaId].driverAdapterManager.teardown(); - delete state[schemaId]; - - return {}; - } - case "getLogs": { - const { schemaId } = params; - return state[schemaId].logs; - } - default: { - throw new Error(`Unknown method: \`${method}\``); - } - } -} - -function respondErr(requestId: number, error: jsonRpc.RpcError) { - const msg: jsonRpc.ErrResponse = { - jsonrpc: "2.0", - id: requestId, - error, - }; - console.log(JSON.stringify(msg)); -} - -function respondOk(requestId: number, payload: unknown) { - const msg: jsonRpc.OkResponse = { - jsonrpc: "2.0", - id: requestId, - result: payload, - }; - console.log(JSON.stringify(msg)); -} - -type InitQueryEngineParams = { - env: Env - driverAdapterManager: DriverAdaptersManager - url: string - schema: string - logCallback: qe.QueryLogCallback -}; - -async function initQe({ - env, - driverAdapterManager, - url, - schema, - logCallback, -}: InitQueryEngineParams) { - if (env.EXTERNAL_TEST_EXECUTOR === 'Mobile') { - url = env.MOBILE_EMULATOR_URL; - - const engine = createRNEngineConnector(url, schema, logCallback); - return { engine, adapter: null }; - } - - const adapter = await driverAdapterManager.connect({ url }) - const errorCapturingAdapter = bindAdapter(adapter) - const engineInstance = await qe.initQueryEngine( - env.EXTERNAL_TEST_EXECUTOR, - errorCapturingAdapter, - schema, - logCallback, - debug - ) - - return { - engine: engineInstance, - adapter: errorCapturingAdapter, - } -} - -main().catch(err); diff --git a/query-engine/driver-adapters/executor/src/types/jsonRpc.ts b/query-engine/driver-adapters/executor/src/types/jsonRpc.ts index 204aab3625c7..194150211d84 100644 --- a/query-engine/driver-adapters/executor/src/types/jsonRpc.ts +++ b/query-engine/driver-adapters/executor/src/types/jsonRpc.ts @@ -1,6 +1,6 @@ import * as S from '@effect/schema/Schema' -const SchemaId = S.number.pipe(S.int(), S.nonNegative()) +const SchemaId = S.number.pipe(S.int(), S.nonNegative()).pipe(S.brand('SchemaId')) export type SchemaId = S.Schema.Type const InitializeSchemaParams = S.struct({ diff --git a/query-engine/driver-adapters/executor/src/utils.ts b/query-engine/driver-adapters/executor/src/utils.ts index f46e44dd2d95..994960b47287 100644 --- a/query-engine/driver-adapters/executor/src/utils.ts +++ b/query-engine/driver-adapters/executor/src/utils.ts @@ -4,7 +4,7 @@ import { fileURLToPath } from 'node:url' export const __dirname = path.dirname(fileURLToPath(import.meta.url)) -export function copyPathName({ fromURL, toURL }: { fromURL: string, toURL: string }) { +export function copyPathName({ fromURL, toURL }: { fromURL: string; toURL: string }) { const toObj = new URL(toURL) toObj.pathname = new URL(fromURL).pathname @@ -16,16 +16,17 @@ export function postgresSchemaName(url: string) { } type PostgresOptions = { - connectionString: string, options?: string + connectionString: string + options?: string } -export function postgres_options(url: string): PostgresOptions { - let args: PostgresOptions = { connectionString: url } - +export function postgresOptions(url: string): PostgresOptions { + const args: PostgresOptions = { connectionString: url } + const schemaName = postgresSchemaName(url) - + if (schemaName != null) { - args.options = `--search_path="${schemaName}"` + args.options = `--search_path="${schemaName}"` } return args @@ -33,10 +34,27 @@ export function postgres_options(url: string): PostgresOptions { // Utility to avoid the `D1_ERROR: No SQL statements detected` error when running // `D1_DATABASE.batch` with an empty array of statements. -export async function runBatch(D1_DATABASE: D1Database, statements: D1PreparedStatement[]): Promise[]> { +export async function runBatch( + D1_DATABASE: D1Database, + statements: D1PreparedStatement[], +): Promise[]> { if (statements.length === 0) { return [] } return D1_DATABASE.batch(statements) } + +// conditional debug logging based on LOG_LEVEL env var +export const debug = (() => { + if ((process.env.LOG_LEVEL ?? '').toLowerCase() != 'debug') { + return (...args: any[]) => {} + } + + return (...args: any[]) => { + console.error('[nodejs] DEBUG:', ...args) + } +})() + +// error logger +export const err = (...args: any[]) => console.error('[nodejs] ERROR:', ...args) diff --git a/query-engine/driver-adapters/executor/src/wasm.ts b/query-engine/driver-adapters/executor/src/wasm.ts deleted file mode 100644 index c60d54f398c3..000000000000 --- a/query-engine/driver-adapters/executor/src/wasm.ts +++ /dev/null @@ -1,30 +0,0 @@ -import * as wasmPostgres from '../../../query-engine-wasm/pkg/postgresql/query_engine_bg.js' -import * as wasmMysql from '../../../query-engine-wasm/pkg/mysql/query_engine_bg.js' -import * as wasmSqlite from '../../../query-engine-wasm/pkg/sqlite/query_engine_bg.js' -import fs from 'node:fs/promises' -import path from 'node:path' -import { __dirname } from './utils' - -const wasm = { - postgres: wasmPostgres, - mysql: wasmMysql, - sqlite: wasmSqlite -} - -type EngineName = keyof typeof wasm - -const initializedModules = new Set() - -export async function getEngineForProvider(provider: EngineName) { - const engine = wasm[provider] - if (!initializedModules.has(provider)) { - const subDir = provider === 'postgres' ? 'postgresql' : provider - const bytes = await fs.readFile(path.resolve(__dirname, '..', '..', '..', 'query-engine-wasm', 'pkg', subDir, 'query_engine_bg.wasm')) - const module = new WebAssembly.Module(bytes) - const instance = new WebAssembly.Instance(module, { './query_engine_bg.js': engine }) - engine.__wbg_set_wasm(instance.exports); - initializedModules.add(provider) - } - - return engine.QueryEngine -}