From ddda8123b48db01270b019c1d352a41c5bda7495 Mon Sep 17 00:00:00 2001 From: bcoll Date: Wed, 15 Nov 2023 18:35:12 +0000 Subject: [PATCH] feat: implement and unit test `LocalRuntimeController` --- .../LocalRuntimeController.spec.ts | 986 ++++++++++++++++++ .../src/__vitests__/helpers/teardown.ts | 32 + .../src/__vitests__/helpers/unusable.ts | 30 + .../startDevWorker/LocalRuntimeController.ts | 562 +++++++++- .../src/deployment-bundle/source-url.ts | 2 +- 5 files changed, 1605 insertions(+), 7 deletions(-) create mode 100644 packages/wrangler/src/__vitests__/api/startDevWorker/LocalRuntimeController.spec.ts create mode 100644 packages/wrangler/src/__vitests__/helpers/teardown.ts create mode 100644 packages/wrangler/src/__vitests__/helpers/unusable.ts diff --git a/packages/wrangler/src/__vitests__/api/startDevWorker/LocalRuntimeController.spec.ts b/packages/wrangler/src/__vitests__/api/startDevWorker/LocalRuntimeController.spec.ts new file mode 100644 index 000000000000..4865d5068260 --- /dev/null +++ b/packages/wrangler/src/__vitests__/api/startDevWorker/LocalRuntimeController.spec.ts @@ -0,0 +1,986 @@ +// noinspection DuplicatedCode + +import assert from "node:assert"; +import events from "node:events"; +import fs from "node:fs"; +import net from "node:net"; +import path from "node:path"; +import util from "node:util"; +import { DeferredPromise, Response, fetch } from "miniflare"; +import { describe, it, expect } from "vitest"; +import { WebSocket } from "ws"; +import { LocalRuntimeController } from "../../../api/startDevWorker/LocalRuntimeController"; +import { teardown, useTmp } from "../../helpers/teardown"; +import { unusable } from "../../helpers/unusable"; +import type { + Bundle, + ReloadCompleteEvent, + StartDevWorkerOptions, + UrlOriginAndPathnameParts, + UrlOriginParts, +} from "../../../api"; + +// WebAssembly module containing single `func add(i32, i32): i32` export. +// Generated using https://webassembly.github.io/wabt/demo/wat2wasm/. +const WASM_ADD_MODULE = Buffer.from( + "AGFzbQEAAAABBwFgAn9/AX8DAgEABwcBA2FkZAAACgkBBwAgACABagsACgRuYW1lAgMBAAA=", + "base64" +); + +async function waitForReloadComplete( + controller: LocalRuntimeController +): Promise { + const [event] = await events.once(controller, "reloadComplete"); + return event; +} + +function joinUrlParts(parts: UrlOriginParts | UrlOriginAndPathnameParts): URL { + const pathname = "pathname" in parts ? parts.pathname : ""; + const spec = `${parts.protocol}//${parts.hostname}:${parts.port}${pathname}`; + return new URL(spec); +} + +function singleModuleBundle( + strings: TemplateStringsArray, + ...args: unknown[] +): Bundle { + return { + type: "modules", + modules: [ + { + type: "ESModule", + name: "index.mjs", + path: "/virtual/index.mjs", + contents: String.raw(strings, ...args), + }, + ], + }; +} + +describe("Core", () => { + it("should start Miniflare with module worker", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + compatibilityFlags: ["nodejs_compat"], + compatibilityDate: "2023-10-01", + }; + const bundle: Bundle = { + type: "modules", + modules: [ + { + type: "ESModule", + name: "index.mjs", + path: "/virtual/esm/index.mjs", + contents: ` + import add from "./add.cjs"; + import base64 from "./base64.cjs"; + import wave1 from "./data/wave.txt"; + import wave2 from "./data/wave.bin"; + export default { + fetch(request, env, ctx) { + const { pathname } = new URL(request.url); + if (pathname === "/") { + const wave2Text = new TextDecoder().decode(wave2); + return Response.json({ + message: base64.decode(base64.encode(wave1 + wave2Text)), + sum: add.add(1, 2), + }); + } else if (pathname === "/throw-commonjs") { + try { add.throw(); } catch (e) { return new Response(e.stack); } + } else if (pathname === "/throw-nodejs-compat-module") { + try { base64.throw(); } catch (e) { return new Response(e.stack); } + } else { + return new Response(null, { status: 404 }); + } + } + }`, + }, + { + type: "CommonJS", + name: "add.cjs", + path: "/virtual/cjs/add.cjs", + contents: ` + const addModule = require("./add.wasm"); + const addInstance = new WebAssembly.Instance(addModule); + module.exports = { + add: addInstance.exports.add, + throw() { + throw new Error("Oops!"); + } + } + `, + }, + { + type: "NodeJsCompatModule", + name: "base64.cjs", + path: "/virtual/node/base64.cjs", + contents: `module.exports = { + encode(value) { + return Buffer.from(value).toString("base64"); + }, + decode(value) { + return Buffer.from(value, "base64").toString(); + }, + throw() { + throw new Error("Oops!"); + } + }`, + }, + { type: "Text", name: "data/wave.txt", contents: "๐Ÿ‘‹" }, + { type: "Data", name: "data/wave.bin", contents: "๐ŸŒŠ" }, + { type: "CompiledWasm", name: "add.wasm", contents: WASM_ADD_MODULE }, + ], + }; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + const event = await waitForReloadComplete(controller); + const url = joinUrlParts(event.proxyData.userWorkerUrl); + + // Check all module types + let res = await fetch(url); + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ message: "๐Ÿ‘‹๐ŸŒŠ", sum: 3 }); + + // Check stack traces from ESModule and CommonJS modules include file path + res = await fetch(new URL("/throw-commonjs", url)); + expect(res.status).toBe(200); + expect(await res.text()).toMatchInlineSnapshot(` + "Error: Oops! + at Object.throw (file:///virtual/cjs/add.cjs:7:14) + at Object.fetch (file:///virtual/esm/index.mjs:16:24)" + `); + + // Check stack traces from NodeJsCompatModule modules include file path + res = await fetch(new URL("/throw-nodejs-compat-module", url)); + expect(res.status).toBe(200); + expect(await res.text()).toMatchInlineSnapshot(` + "Error: Oops! + at Object.throw (file:///virtual/node/base64.cjs:9:14) + at Object.fetch (file:///virtual/esm/index.mjs:18:27)" + `); + }); + it("should start Miniflare with service worker", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + }; + const bundle: Bundle = { + type: "service-worker", + serviceWorker: { + path: "/virtual/index.js", + contents: `addEventListener("fetch", (event) => { + const { pathname } = new URL(event.request.url); + if (pathname === "/") { + const addInstance = new WebAssembly.Instance(add_wasm); + const res = Response.json({ + one: data_one_txt, + two: new TextDecoder().decode(data_two_bin), + three: addInstance.exports.add(1, 2), + }); + event.respondWith(res) + } else if (pathname === "/throw") { + try { throw new Error("Oops!"); } catch (e) { event.respondWith(new Response(e.stack)); } + } else { + event.respondWith(new Response(null, { status: 404 })); + } + });`, + }, + modules: [ + { type: "Text", name: "data/one.txt", contents: "one" }, + { type: "Data", name: "data/two.bin", contents: "two" }, + { type: "CompiledWasm", name: "add.wasm", contents: WASM_ADD_MODULE }, + ], + }; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + const event = await waitForReloadComplete(controller); + const url = joinUrlParts(event.proxyData.userWorkerUrl); + + // Check additional modules added to global scope + let res = await fetch(url); + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ one: "one", two: "two", three: 3 }); + + // Check stack traces include file path + res = await fetch(new URL("/throw", url)); + expect(res.status).toBe(200); + expect(await res.text()).toMatchInlineSnapshot(` + "Error: Oops! + at file:///virtual/index.js:12:19" + `); + }); + it("should update the running Miniflare instance", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + function update(version: number) { + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { + VERSION: { type: "var", value: version }, + }, + }; + const bundle = singleModuleBundle`export default { + fetch(request, env, ctx) { + return Response.json({ binding: env.VERSION, bundle: ${version} }); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + } + + // Start worker + update(1); + let event = await waitForReloadComplete(controller); + let res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.json()).toEqual({ binding: 1, bundle: 1 }); + + // Update worker and check config/bundle updated + update(2); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.json()).toEqual({ binding: 2, bundle: 2 }); + + // Update worker multiple times and check only latest config/bundle used + const eventPromise = waitForReloadComplete(controller); + update(3); + update(4); + update(5); + event = await eventPromise; + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.json()).toEqual({ binding: 5, bundle: 5 }); + }); + it("should start Miniflare with configured compatibility settings", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + // `global_navigator` was enabled by default on `2022-03-21`: + // https://developers.cloudflare.com/workers/configuration/compatibility-dates/#global-navigator + const disabledDate = "2022-03-20"; + const enabledDate = "2022-03-21"; + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + compatibilityDate: disabledDate, + }; + const bundle = singleModuleBundle`export default { + fetch(request, env, ctx) { return new Response(typeof navigator); } + }`; + + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + let event = await waitForReloadComplete(controller); + let res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("undefined"); + + // Check respects compatibility date + config.compatibilityDate = enabledDate; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("object"); + + // Check respects compatibility flags + config.compatibilityDate = disabledDate; + config.compatibilityFlags = ["global_navigator"]; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("object"); + }); + it("should start inspector on random port and allow debugging", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + }; + const bundle = singleModuleBundle`export default { + fetch(request, env, ctx) { + debugger; + return new Response("body"); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + const event = await waitForReloadComplete(controller); + const url = joinUrlParts(event.proxyData.userWorkerUrl); + const inspectorUrl = joinUrlParts(event.proxyData.userWorkerInspectorUrl); + + // Connect inspector WebSocket + const ws = new WebSocket(inspectorUrl); + const messages = events.on(ws, "message"); + async function nextMessage() { + const messageEvent = (await messages.next()).value; + return JSON.parse(messageEvent[0].toString()); + } + + // Enable `Debugger` domain + await events.once(ws, "open"); + ws.send(JSON.stringify({ id: 0, method: "Debugger.enable" })); + expect(await nextMessage()).toMatchObject({ + method: "Debugger.scriptParsed", + params: { url: "file:///virtual/index.mjs" }, + }); + expect(await nextMessage()).toMatchObject({ id: 0 }); + + // Send request and hit `debugger;` statement + const resPromise = fetch(url); + expect(await nextMessage()).toMatchObject({ method: "Debugger.paused" }); + + // Resume execution + ws.send(JSON.stringify({ id: 1, method: "Debugger.resume" })); + expect(await nextMessage()).toMatchObject({ id: 1 }); + const res = await resPromise; + expect(await res.text()).toBe("body"); + }); +}); + +describe("Bindings", () => { + it("should expose basic bindings", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { + TEXT: { type: "var", value: "text" }, + OBJECT: { type: "var", value: { a: { b: 1 } } }, + DATA: { type: "var", value: new Uint8Array([1, 2, 3]) }, + }, + }; + const bundle = singleModuleBundle`export default { + fetch(request, env, ctx) { + const body = JSON.stringify(env, (key, value) => { + if (value instanceof ArrayBuffer) { + return { $type: "ArrayBuffer", value: Array.from(new Uint8Array(value)) }; + } + return value; + }); + return new Response(body); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + const event = await waitForReloadComplete(controller); + const res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.json()).toEqual({ + TEXT: "text", + OBJECT: { a: { b: 1 } }, + DATA: { $type: "ArrayBuffer", value: [1, 2, 3] }, + }); + }); + it("should expose WebAssembly module bindings in service workers", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { + // `wasm-module` bindings aren't allowed in modules workers + WASM: { type: "wasm-module", source: { contents: WASM_ADD_MODULE } }, + }, + }; + const bundle: Bundle = { + type: "service-worker", + serviceWorker: { + contents: `addEventListener("fetch", (event) => { + const addInstance = new WebAssembly.Instance(WASM); + event.respondWith(new Response(addInstance.exports.add(1, 2))); + });`, + }, + }; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + const event = await waitForReloadComplete(controller); + const res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("3"); + }); + it("should persist cached data", async () => { + const tmp = useTmp(); + const persist = path.join(tmp, "persist"); + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + dev: { persist: { path: persist } }, + }; + const bundle = singleModuleBundle`export default { + async fetch(request, env, ctx) { + const key = "http://localhost/"; + if (request.method === "POST") { + const response = new Response("cached", { + headers: { "Cache-Control": "max-age=3600" } + }); + await caches.default.put(key, response); + } + return (await caches.default.match(key)) ?? new Response("miss"); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + let event = await waitForReloadComplete(controller); + let res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl), { + method: "POST", + }); + expect(await res.text()).toBe("cached"); + + // Check restarting uses persisted data + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("cached"); + + // Check deleting persistence directory removes data + await controller.teardown(); + fs.rmSync(persist, { recursive: true }); + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("miss"); + }); + it("should expose KV namespace bindings", async () => { + const tmp = useTmp(); + const persist = path.join(tmp, "persist"); + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { NAMESPACE: { type: "kv", id: "ns" } }, + dev: { persist: { path: persist } }, + }; + const bundle = singleModuleBundle`export default { + async fetch(request, env, ctx) { + if (request.method === "POST") await env.NAMESPACE.put("key", "value"); + return new Response(await env.NAMESPACE.get("key")); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + let event = await waitForReloadComplete(controller); + let res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl), { + method: "POST", + }); + expect(await res.text()).toBe("value"); + + // Check restarting uses persisted data + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("value"); + + // Check deleting persistence directory removes data + await controller.teardown(); + fs.rmSync(persist, { recursive: true }); + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe(""); + }); + it("should support Workers Sites bindings", async () => { + const tmp = useTmp(); + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + fs.writeFileSync(path.join(tmp, "company.txt"), "๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ"); + fs.writeFileSync(path.join(tmp, "charts.xlsx"), "๐Ÿ“Š"); + fs.writeFileSync(path.join(tmp, "secrets.txt"), "๐Ÿ”"); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + site: { path: tmp, include: ["*.txt"] }, + }; + const bundle = singleModuleBundle` + import manifestJSON from "__STATIC_CONTENT_MANIFEST"; + const manifest = JSON.parse(manifestJSON); + export default { + async fetch(request, env, ctx) { + const { pathname } = new URL(request.url); + const path = pathname.substring(1); + const key = manifest[path]; + if (key === undefined) return new Response(null, { status: 404 }); + const value = await env.__STATIC_CONTENT.get(key, "stream"); + if (value === null) return new Response(null, { status: 404 }); + return new Response(value); + } + }`; + + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + let event = await waitForReloadComplete(controller); + let url = joinUrlParts(event.proxyData.userWorkerUrl); + let res = await fetch(new URL("/company.txt", url)); + expect(res.status).toBe(200); + expect(await res.text()).toBe("๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ"); + res = await fetch(new URL("/charts.xlsx", url)); + expect(res.status).toBe(404); + res = await fetch(new URL("/secrets.txt", url)); + expect(res.status).toBe(200); + + config.site = { path: tmp, exclude: ["secrets.txt"] }; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + url = joinUrlParts(event.proxyData.userWorkerUrl); + res = await fetch(new URL("/company.txt", url)); + expect(res.status).toBe(200); + res = await fetch(new URL("/charts.xlsx", url)); + expect(res.status).toBe(200); + res = await fetch(new URL("/secrets.txt", url)); + expect(res.status).toBe(404); + }); + it("should expose R2 bucket bindings", async () => { + const tmp = useTmp(); + const persist = path.join(tmp, "persist"); + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { BUCKET: { type: "r2", bucket_name: "bucket" } }, + dev: { persist: { path: persist } }, + }; + const bundle = singleModuleBundle`export default { + async fetch(request, env, ctx) { + if (request.method === "POST") await env.BUCKET.put("key", "value"); + const object = await env.BUCKET.get("key"); + return new Response(object?.body); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + let event = await waitForReloadComplete(controller); + let res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl), { + method: "POST", + }); + expect(await res.text()).toBe("value"); + + // Check restarting uses persisted data + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe("value"); + + // Check deleting persistence directory removes data + await controller.teardown(); + fs.rmSync(persist, { recursive: true }); + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.text()).toBe(""); + }); + it("should expose D1 database bindings", async () => { + const tmp = useTmp(); + const persist = path.join(tmp, "persist"); + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { + DB: { type: "d1", database_name: "db-name", database_id: "db" }, + }, + dev: { persist: { path: persist } }, + }; + const bundle = singleModuleBundle`export default { + async fetch(request, env, ctx) { + await env.DB.exec("CREATE TABLE IF NOT EXISTS entries (key text PRIMARY KEY, value text)"); + if (request.method === "POST") { + await env.DB.prepare("INSERT INTO entries (key, value) VALUES (?, ?)").bind("key", "value").run(); + } + const result = await env.DB.prepare("SELECT * FROM entries").all(); + return Response.json(result.results); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + let event = await waitForReloadComplete(controller); + let res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl), { + method: "POST", + }); + expect(await res.json()).toEqual([{ key: "key", value: "value" }]); + + // Check restarting uses persisted data + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.json()).toEqual([{ key: "key", value: "value" }]); + + // Check deleting persistence directory removes data + await controller.teardown(); + fs.rmSync(persist, { recursive: true }); + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(await res.json()).toEqual([]); + }); + it("should expose queue producer bindings and consume queue messages", async () => { + const tmp = useTmp(); + const persist = path.join(tmp, "persist"); + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const reportPromise = new DeferredPromise(); + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { + QUEUE: { type: "queue-producer", name: "queue" }, + BATCH_REPORT: { + type: "service", + async service(request) { + reportPromise.resolve(await request.json()); + return new Response(null, { status: 204 }); + }, + }, + }, + triggers: [{ type: "queue-consumer", name: "queue", maxBatchTimeout: 0 }], + dev: { persist: { path: persist } }, + }; + const bundle = singleModuleBundle`export default { + async fetch(request, env, ctx) { + await env.QUEUE.send("message"); + return new Response(null, { status: 204 }); + }, + async queue(batch, env, ctx) { + await env.BATCH_REPORT.fetch("http://placeholder", { + method: "POST", + body: JSON.stringify(batch.messages.map(({ body }) => body)) + }); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + const event = await waitForReloadComplete(controller); + const res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl), { + method: "POST", + }); + expect(res.status).toBe(204); + expect(await reportPromise).toEqual(["message"]); + }); + it("should expose hyperdrive bindings", async () => { + // Start echo TCP server + const server = net.createServer((socket) => socket.pipe(socket)); + const listeningPromise = events.once(server, "listening"); + server.listen(0, "127.0.0.1"); + teardown(() => util.promisify(server.close.bind(server))()); + await listeningPromise; + const address = server.address(); + assert(typeof address === "object" && address !== null); + const port = address.port; + + // Start runtime with hyperdrive binding + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const localConnectionString = `postgres://username:password@127.0.0.1:${port}/db`; + const config: StartDevWorkerOptions = { + name: "worker", + script: unusable(), + bindings: { DB: { type: "hyperdrive", id: "db", localConnectionString } }, + }; + const bundle = singleModuleBundle`export default { + async fetch(request, env, ctx) { + const socket = env.DB.connect(); + const writer = socket.writable.getWriter(); + await writer.write(new TextEncoder().encode("๐Ÿ‘‹")); + await writer.close(); + return new Response(socket.readable); + } + }`; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + + const event = await waitForReloadComplete(controller); + const res = await fetch(joinUrlParts(event.proxyData.userWorkerUrl)); + expect(res.status).toBe(200); + expect(await res.text()).toBe("๐Ÿ‘‹"); + }); +}); + +describe("Multi-Worker Bindings", () => { + it("should expose service bindings to other workers", async () => { + const controller = new LocalRuntimeController(); + teardown(() => controller.teardown()); + + const config: StartDevWorkerOptions = { + name: "a", + script: unusable(), + bindings: { + A: { type: "service", service: { name: "a" } }, // Self binding + B: { + type: "service", + service(request) { + const body = `b:${request.url}`; + return new Response(body); + }, + }, + C: { type: "service", service: { name: "c" } }, + D: { type: "service", service: { name: "d" } }, + E: { type: "service", service: { name: "e" } }, // Invalid binding + }, + dev: { + getRegisteredWorker(name) { + if (!["c", "d"].includes(name)) return undefined; + return (request) => { + const body = `registered:${name}:${request.url}`; + return new Response(body); + }; + }, + }, + }; + const bundle = singleModuleBundle`export default { + async fetch(request, env, ctx) { + const { pathname } = new URL(request.url); + const name = pathname.substring(1); + const res = await env[name]?.fetch("http://placeholder/"); + return new Response("a:" + await res?.text()); + } + }`; + + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + let event = await waitForReloadComplete(controller); + let url = joinUrlParts(event.proxyData.userWorkerUrl); + let res = await fetch(new URL("/A", url)); + expect(await res.text()).toBe("a:a:undefined"); + res = await fetch(new URL("/B", url)); + expect(await res.text()).toBe("a:b:http://placeholder/"); + res = await fetch(new URL("/C", url)); + expect(await res.text()).toBe("a:registered:c:http://placeholder/"); + res = await fetch(new URL("/D", url)); + expect(await res.text()).toBe("a:registered:d:http://placeholder/"); + res = await fetch(new URL("/E", url)); + expect(await res.text()).toMatchInlineSnapshot( + '"a:[wrangler] Couldn\'t find `wrangler dev` session for service \\"e\\" to proxy to`"' + ); + + // Check with no `getRegisteredWorker()` function defined + config.dev = {}; + controller.onBundleStart({ type: "bundleStart", config }); + controller.onBundleComplete({ type: "bundleComplete", config, bundle }); + event = await waitForReloadComplete(controller); + url = joinUrlParts(event.proxyData.userWorkerUrl); + res = await fetch(new URL("/C", url)); + expect(await res.text()).toMatchInlineSnapshot( + '"a:[wrangler] Couldn\'t find `wrangler dev` session for service \\"c\\" to proxy to`"' + ); + }); + it("should expose Durable Object bindings to other workers", async () => { + const controllerA = new LocalRuntimeController(); + teardown(() => controllerA.teardown()); + + // Start entry worker + let urlB: URL | undefined = undefined; + const configA: StartDevWorkerOptions = { + name: "a", + script: unusable(), + compatibilityFlags: ["no_cf_botmanagement_default"], + bindings: { + A_OBJECT_1: { + // Binding to object in self without `service` + type: "durable-object", + className: "AObject1", + }, + A_OBJECT_2: { + // Binding to object in self with `service` + type: "durable-object", + className: "AObject2", + service: { name: "a" }, + }, + B_OBJECT_1: { + // Binding to object in another service + type: "durable-object", + className: "BObject1", + service: { name: "b" }, + }, + B_OBJECT_2: { + // Binding to non-existent object in another service + type: "durable-object", + className: "BObject2", + service: { name: "b" }, + }, + C_OBJECT_1: { + // Binding to object in non-existent service + type: "durable-object", + className: "CObject1", + service: { name: "c" }, + }, + }, + dev: { + getRegisteredWorker(name) { + if (name !== "b") return undefined; + return (request) => { + assert(urlB !== undefined); + const url = new URL(request.url); + url.protocol = urlB.protocol; + url.host = urlB.host; + return fetch(url, request); + }; + }, + }, + }; + const objectClassBody = `{ + async fetch(request) { + return Response.json({ + source: this.constructor.name, + method: request.method, + url: request.url, + headers: Object.fromEntries(request.headers), + cf: request.cf, + body: await request.text(), + }); + } + }`; + const bundleA = singleModuleBundle` + export class AObject1 ${objectClassBody} + export class AObject2 extends AObject1 {} + export default { + fetch(request, env, ctx) { + const { pathname } = new URL(request.url); + const name = pathname.substring(1); + const ns = env[name]; + if (ns === undefined) return new Response(null, { status: 404 }); + + const id = ns.newUniqueId(); + const stub = ns.get(id); + return stub.fetch("http://placeholder/", { + method: "POST", + headers: { "Content-Type": "text/plain" }, + cf: { secret: "๐Ÿ”‘" }, + body: "๐Ÿฉป", + }); + } + }`; + controllerA.onBundleStart({ type: "bundleStart", config: configA }); + controllerA.onBundleComplete({ + type: "bundleComplete", + config: configA, + bundle: bundleA, + }); + let eventA = await waitForReloadComplete(controllerA); + let urlA = joinUrlParts(eventA.proxyData.userWorkerUrl); + + // Start other worker + const controllerB = new LocalRuntimeController(); + teardown(() => controllerB.teardown()); + const configB: StartDevWorkerOptions = { + name: "b", + script: unusable(), + compatibilityFlags: ["no_cf_botmanagement_default"], + bindings: { + B_OBJECT_1: { type: "durable-object", className: "BObject1" }, + }, + }; + const bundleB = singleModuleBundle`export class BObject1 ${objectClassBody};`; + controllerB.onBundleStart({ type: "bundleStart", config: configB }); + controllerB.onBundleComplete({ + type: "bundleComplete", + config: configB, + bundle: bundleB, + }); + const eventB = await waitForReloadComplete(controllerB); + urlB = joinUrlParts(eventB.proxyData.userWorkerUrl); + + // Check objects in entry worker + let res = await fetch(new URL("/A_OBJECT_1", urlA)); + expect(await res.json()).toMatchInlineSnapshot(` + { + "body": "๐Ÿฉป", + "cf": { + "secret": "๐Ÿ”‘", + }, + "headers": { + "content-length": "4", + "content-type": "text/plain", + }, + "method": "POST", + "source": "AObject1", + "url": "http://placeholder/", + } + `); + res = await fetch(new URL("/A_OBJECT_2", urlA)); + expect(await res.json()).toMatchObject({ source: "AObject2" }); + + // Check objects in other worker + res = await fetch(new URL("/B_OBJECT_1", urlA)); + expect(await res.json()).toMatchInlineSnapshot(` + { + "body": "๐Ÿฉป", + "cf": { + "secret": "๐Ÿ”‘", + }, + "headers": { + "content-length": "4", + "content-type": "text/plain", + }, + "method": "POST", + "source": "BObject1", + "url": "http://placeholder/", + } + `); + + // Check missing Durable Object class + res = await fetch(new URL("/B_OBJECT_2", urlA)); + expect(await res.text()).toMatchInlineSnapshot( + '"[wrangler] Couldn\'t find class \\"BObject2\\" in service \\"b\\" to proxy to"' + ); + + // Check missing service + res = await fetch(new URL("/C_OBJECT_1", urlA)); + expect(await res.text()).toMatchInlineSnapshot( + '"[wrangler] Couldn\'t find `wrangler dev` session for service \\"c\\" to proxy to`"' + ); + + // Check with no `getRegisteredWorker()` function defined + configA.dev = {}; + controllerA.onBundleStart({ type: "bundleStart", config: configA }); + controllerA.onBundleComplete({ + type: "bundleComplete", + config: configA, + bundle: bundleA, + }); + eventA = await waitForReloadComplete(controllerA); + urlA = joinUrlParts(eventA.proxyData.userWorkerUrl); + res = await fetch(new URL("/B_OBJECT_1", urlA)); + expect(await res.text()).toMatchInlineSnapshot( + '"[wrangler] Couldn\'t find `wrangler dev` session for service \\"b\\" to proxy to`"' + ); + }); +}); diff --git a/packages/wrangler/src/__vitests__/helpers/teardown.ts b/packages/wrangler/src/__vitests__/helpers/teardown.ts new file mode 100644 index 000000000000..51821d840106 --- /dev/null +++ b/packages/wrangler/src/__vitests__/helpers/teardown.ts @@ -0,0 +1,32 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach } from "vitest"; + +const teardownCallbacks: (() => void | Promise)[] = []; +export function teardown(callback: () => void | Promise) { + // `unshift()` so teardown callbacks executed in reverse + teardownCallbacks.unshift(callback); +} + +afterEach(async () => { + const errors: unknown[] = []; + for (const callback of teardownCallbacks.splice(0)) { + try { + await callback(); + } catch (error) { + errors.push(error); + } + } + if (errors.length > 0) + throw new AggregateError( + errors, + ["Unable to teardown:", ...errors.map(String)].join("\n") + ); +}); + +export function useTmp() { + const tmp = fs.mkdtempSync(path.join(os.tmpdir(), "wrangler-vitest-")); + teardown(() => fs.rmSync(tmp, { recursive: true, force: true })); + return tmp; +} diff --git a/packages/wrangler/src/__vitests__/helpers/unusable.ts b/packages/wrangler/src/__vitests__/helpers/unusable.ts new file mode 100644 index 000000000000..83564eb244fb --- /dev/null +++ b/packages/wrangler/src/__vitests__/helpers/unusable.ts @@ -0,0 +1,30 @@ +export function unusable(): T { + return new Proxy({} as T, { + apply() { + throw new TypeError("Attempted to call unusable object"); + }, + construct() { + throw new TypeError("Attempted to construct unusable object"); + }, + deleteProperty(_target, prop) { + throw new TypeError( + `Attempted to delete "${String(prop)}" on unusable object` + ); + }, + get(_target, prop) { + throw new TypeError( + `Attempted to get "${String(prop)}" on unusable object` + ); + }, + has(_target, prop) { + throw new TypeError( + `Attempted to check for "${String(prop)}" on unusable object` + ); + }, + set(_target, prop) { + throw new TypeError( + `Attempted to set "${String(prop)}" on unusable object` + ); + }, + }); +} diff --git a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts index 16751df9067e..f5294173b874 100644 --- a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts +++ b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts @@ -1,5 +1,14 @@ +import assert from "node:assert"; +import fs from "node:fs"; +import path from "node:path"; +import { Log, LogLevel, Response, Miniflare, Mutex } from "miniflare"; +import { withSourceURL } from "../../deployment-bundle/source-url"; +import { getLocalPersistencePath } from "../../dev/get-local-persistence-path"; +import { logger } from "../../logger"; +import { updateCheck } from "../../update-check"; import { RuntimeController } from "./BaseController"; -import { notImplemented } from "./NotImplementedError"; +import { castErrorCause } from "./events"; +import type { LoggerLevel } from "../../logger"; import type { BundleCompleteEvent, BundleStartEvent, @@ -7,24 +16,565 @@ import type { ReloadCompleteEvent, ReloadStartEvent, } from "./events"; +import type { + Binding, + ServiceDesignator, + StartDevWorkerOptions, + Bundle, + File, +} from "./types"; +import type { + MiniflareOptions, + SourceOptions, + WorkerOptions, + SharedOptions, +} from "miniflare"; + +const warnOnceMessages = new Set(); +function warnOnce(message: string) { + if (warnOnceMessages.has(message)) return; + warnOnceMessages.add(message); + logger.warn(message); +} + +function getBinaryFileContents(file: File) { + if ("contents" in file) { + if (file.contents instanceof Buffer) return file.contents; + return Buffer.from(file.contents); + } + return fs.readFileSync(file.path); +} +function getTextFileContents(file: File) { + if ("contents" in file) { + if (typeof file.contents === "string") return file.contents; + if (file.contents instanceof Buffer) return file.contents.toString(); + return Buffer.from(file.contents).toString(); + } + return fs.readFileSync(file.path, "utf8"); +} + +// This worker proxies all external Durable Objects to the Wrangler session +// where they're defined, and receives all requests from other Wrangler sessions +// for this session's Durable Objects. Note the original request URL may contain +// non-standard protocols, so we store it in a header to restore later. +const EXTERNAL_DURABLE_OBJECTS_WORKER_NAME = + "__WRANGLER_EXTERNAL_DURABLE_OBJECTS_WORKER"; +const EXTERNAL_DURABLE_OBJECTS_GET_REGISTERED_WORKER = + "__EXTERNAL_DURABLE_OBJECTS_GET_REGISTERED_WORKER"; +const EXTERNAL_DURABLE_OBJECTS_CONTROL_HEADER = "Wrangler-MF-Durable-Object"; +const EXTERNAL_DURABLE_OBJECTS_WORKER_SCRIPT = ` +function createClass({ scriptName, className }) { + return class { + constructor(state, env) { + this.id = state.id.toString(); + this.env = env; + } + fetch(request) { + const control = { + scriptName, + className, + id: this.id, + url: request.url, + // Ensure exact headers and cf object forwarded to Durable Object + headers: Array.from(request.headers.entries()), + cf: request.cf, + }; + // TODO(someday): fix this for GET requests with bodies. This is a restriction of the Fetch API that workerd has + // workarounds for, but undici doesn't. See https://github.com/cloudflare/workerd/issues/1122 for more details. + // If we wanted to support this, we'd likely need to use undici's dispatch() or Node's http module directly when + // making requests to other dev sessions. + return this.env.${EXTERNAL_DURABLE_OBJECTS_GET_REGISTERED_WORKER}.fetch("http://placeholder/${EXTERNAL_DURABLE_OBJECTS_WORKER_NAME}", { + method: request.method, + headers: { "${EXTERNAL_DURABLE_OBJECTS_CONTROL_HEADER}": JSON.stringify(control) }, + body: request.body, + }); + } + } +} + +export default { + fetch(request, env) { + const controlHeader = request.headers.get("${EXTERNAL_DURABLE_OBJECTS_CONTROL_HEADER}"); + if (controlHeader === null) { + return new Response("[wrangler] Received Durable Object proxy request missing control information. Ensure all your dev sessions are using the same version of Wrangler.", { status: 400 }); + } + const { scriptName, className, id, url, headers, cf } = JSON.parse(controlHeader); + request = new Request(url, { + method: request.method, + headers, + cf, + body: request.body, + }); + const ns = env[className]; + if (ns === undefined) { + return new Response(\`[wrangler] Couldn't find class "\${className}" in service "\${scriptName}" to proxy to\`, { status: 503 }); + } + const idObject = ns.idFromString(id); + const stub = ns.get(idObject); + return stub.fetch(request); + } +} +`; + +class WranglerLog extends Log { + #warnedCompatibilityDateFallback = false; + + log(message: string) { + // Hide request logs for external Durable Objects proxy worker + if (message.includes(EXTERNAL_DURABLE_OBJECTS_WORKER_NAME)) return; + super.log(message); + } + + warn(message: string) { + // Only log warning about requesting a compatibility date after the workerd + // binary's version once, and only if there's an update available. + if (message.startsWith("The latest compatibility date supported by")) { + if (this.#warnedCompatibilityDateFallback) return; + this.#warnedCompatibilityDateFallback = true; + return void updateCheck().then((maybeNewVersion) => { + if (maybeNewVersion === undefined) return; + message += [ + "", + "Features enabled by your requested compatibility date may not be available.", + `Upgrade to \`wrangler@${maybeNewVersion}\` to remove this warning.`, + ].join("\n"); + super.warn(message); + }); + } + super.warn(message); + } + + // TODO(soon): remove this override when miniflare is fixed: + // https://jira.cfdata.org/browse/DEVX-983 + error(message: Error) { + try { + super.error(message); + } catch { + // Miniflare shouldn't throw in `Log#error()`. For now, ignore errors. + } + } +} + +export const DEFAULT_WORKER_NAME = "worker"; +function getName(config: StartDevWorkerOptions) { + return config.name ?? DEFAULT_WORKER_NAME; +} +const IDENTIFIER_UNSAFE_REGEXP = /[^a-zA-Z0-9_$]/g; +function getIdentifier(name: string) { + return name.replace(IDENTIFIER_UNSAFE_REGEXP, "_"); +} + +function castLogLevel(level: LoggerLevel): LogLevel { + let key = level.toUpperCase() as Uppercase; + if (key === "LOG") key = "INFO"; + return LogLevel[key]; +} + +function buildLog(): Log { + let level = castLogLevel(logger.loggerLevel); + // Clamp log level to WARN, so we don't show request logs for user worker + level = Math.min(level, LogLevel.WARN); + return new WranglerLog(level, { prefix: "wrangler-UserWorker" }); +} + +function buildSourceOptions(bundle: Bundle): SourceOptions { + if (bundle.type === "service-worker") { + // Miniflare will handle adding `//# sourceURL` comments if they're missing + const script = getTextFileContents(bundle.serviceWorker); + return { script, scriptPath: bundle.serviceWorker.path }; + } else { + const modulesRoot = path.dirname(bundle.modules[0].name); + type Module = Extract[number]; + const modules = bundle.modules.map((module) => { + let contents: string | Uint8Array | undefined; + if ( + module.type === "ESModule" || + module.type === "CommonJS" || + module.type === "NodeJsCompatModule" + ) { + contents = getTextFileContents(module); + if (module.path !== undefined) { + contents = withSourceURL(contents, module.path); + } + } else { + contents = getBinaryFileContents(module); + } + return { + type: module.type, + path: path.resolve(modulesRoot, module.name), + contents, + }; + }); + return { modulesRoot, modules }; + } +} + +function isUnsafeBindingType(type: string): type is `unsafe-${string}` { + return type.startsWith("unsafe-"); +} +function buildBindingOptions(event: BundleCompleteEvent) { + const jsonBindings: NonNullable = {}; + const textBlobBindings: NonNullable = {}; + const dataBlobBindings: NonNullable = {}; + const wasmBindings: NonNullable = {}; + const kvNamespaces: NonNullable = {}; + const r2Buckets: NonNullable = {}; + const d1Databases: NonNullable = {}; + const queueProducers: NonNullable = {}; + const hyperdrives: NonNullable = {}; + const serviceBindings: NonNullable = {}; + + type DurableObjectBinding = { + name: string; + binding: Extract; + }; + const internalObjects: DurableObjectBinding[] = []; + const externalObjects: (DurableObjectBinding & { + binding: { service: ServiceDesignator }; // External so must have a service + })[] = []; + + const bindings = event.config.bindings; + const getRegisteredWorker = event.config.dev?.getRegisteredWorker; + for (const [name, binding] of Object.entries(bindings ?? {})) { + if (binding.type === "kv") { + kvNamespaces[name] = binding.id; + } else if (binding.type === "r2") { + r2Buckets[name] = binding.bucket_name; + } else if (binding.type === "d1") { + d1Databases[name] = binding.preview_database_id ?? binding.database_id; + } else if (binding.type === "durable-object") { + // Partition Durable Objects based on whether they're internal (defined by + // this session's worker), or external (defined by another session's + // worker registered in the dev registry) + const internal = + binding.service === undefined || + binding.service.name === event.config.name; + (internal ? internalObjects : externalObjects).push({ name, binding }); + } else if (binding.type === "service") { + if (typeof binding.service === "function") { + serviceBindings[name] = binding.service; + } else if (binding.service.name === event.config.name) { + serviceBindings[name] = binding.service.name; + } else { + const serviceName = binding.service.name; + serviceBindings[name] = (request) => { + const registeredFetch = getRegisteredWorker?.(serviceName); + if (registeredFetch !== undefined) return registeredFetch(request); + return new Response( + `[wrangler] Couldn't find \`wrangler dev\` session for service "${serviceName}" to proxy to\``, + { status: 503 } + ); + }; + } + } else if (binding.type === "queue-producer") { + queueProducers[name] = binding.name; + } else if (binding.type === "constellation") { + warnOnce( + "Miniflare does not support Constellation bindings yet, ignoring..." + ); + } else if (binding.type === "var") { + if (binding.value instanceof Uint8Array) { + dataBlobBindings[name] = binding.value; + } else { + jsonBindings[name] = binding.value; + } + } else if (binding.type === "wasm-module") { + wasmBindings[name] = getBinaryFileContents(binding.source); + } else if (binding.type === "hyperdrive") { + if (binding.localConnectionString !== undefined) { + hyperdrives[name] = binding.localConnectionString; + } + } else if (isUnsafeBindingType(binding.type)) { + warnOnce("Miniflare does not support unsafe bindings, ignoring..."); + } else { + const _exhaustive: never = binding.type; + } + } + + // Setup blob and module bindings + if (event.bundle.type === "service-worker") { + // For the service-worker format, blobs are accessible on the global scope + for (const module of event.bundle.modules ?? []) { + const identifier = getIdentifier(module.name); + if (module.type === "Text") { + jsonBindings[identifier] = getTextFileContents(module); + } else if (module.type === "Data") { + dataBlobBindings[identifier] = getBinaryFileContents(module); + } else if (module.type === "CompiledWasm") { + wasmBindings[identifier] = getBinaryFileContents(module); + } + } + } + + // Setup Durable Object bindings and proxy worker + const externalDurableObjectWorker: WorkerOptions = { + name: EXTERNAL_DURABLE_OBJECTS_WORKER_NAME, + // Bind all internal objects, so they're accessible by all other sessions + // that proxy requests for our objects to this worker + durableObjects: Object.fromEntries( + internalObjects.map(({ binding }) => [ + binding.className, + { className: binding.className, scriptName: getName(event.config) }, + ]) + ), + // Setup service binding for Durable Objects to call `getRegisteredWorker()` + serviceBindings: { + [EXTERNAL_DURABLE_OBJECTS_GET_REGISTERED_WORKER](request) { + const controlHeader = request.headers.get( + EXTERNAL_DURABLE_OBJECTS_CONTROL_HEADER + ); + assert(controlHeader !== null); + const { scriptName } = JSON.parse(controlHeader); + const registeredFetch = getRegisteredWorker?.(scriptName); + if (registeredFetch !== undefined) return registeredFetch(request); + return new Response( + `[wrangler] Couldn't find \`wrangler dev\` session for service "${scriptName}" to proxy to\``, + { status: 503 } + ); + }, + }, + // Use this worker instead of the user worker if the pathname is + // `/${EXTERNAL_DURABLE_OBJECTS_WORKER_NAME}` + // TODO(soon): consider using `/cdn-cgi/...` path here, if we switched, + // we'd lose compatibility with other Wrangler 3 versions + routes: [`*/${EXTERNAL_DURABLE_OBJECTS_WORKER_NAME}`], + // Use in-memory storage for the stub object classes *declared* by this + // script. They don't need to persist anything, and would end up using the + // incorrect unsafe unique key. + unsafeEphemeralDurableObjects: true, + // Make sure we use the provided `cf` object as is + compatibilityFlags: ["no_cf_botmanagement_default"], + modules: true, + script: + EXTERNAL_DURABLE_OBJECTS_WORKER_SCRIPT + + // Add stub object classes that proxy requests to the correct session + externalObjects + .map(({ binding }) => { + const identifier = getIdentifier( + `${binding.service.name}_${binding.className}` + ); + const scriptNameJson = JSON.stringify(binding.service.name); + const classNameJson = JSON.stringify(binding.className); + return `export const ${identifier} = createClass({ scriptName: ${scriptNameJson}, className: ${classNameJson} });`; + }) + .join("\n"), + }; + + const bindingOptions = { + bindings: jsonBindings, + textBlobBindings, + dataBlobBindings, + wasmBindings, + + kvNamespaces, + r2Buckets, + d1Databases, + queueProducers, + hyperdrives, + + serviceBindings, + durableObjects: Object.fromEntries([ + ...internalObjects.map(({ name, binding }) => [name, binding.className]), + ...externalObjects.map(({ name, binding }) => { + const identifier = getIdentifier( + `${binding.service.name}_${binding.className}` + ); + return [ + name, + { + className: identifier, + scriptName: EXTERNAL_DURABLE_OBJECTS_WORKER_NAME, + // Matches the unique key Miniflare will generate for this object in + // the target session. We need to do this so workerd generates the + // same IDs it would if this were part of the same process. workerd + // doesn't allow IDs from Durable Objects with different unique keys + // to be used with each other. + unsafeUniqueKey: `${binding.service.name}-${binding.className}`, + }, + ]; + }), + ]), + }; + + return { bindingOptions, externalDurableObjectWorker }; +} + +function buildTriggerOptions(config: StartDevWorkerOptions) { + const queueConsumers: NonNullable = {}; + for (const trigger of config.triggers ?? []) { + if (trigger.type === "workers.dev" || trigger.type === "route") { + // Ignore HTTP triggers, we just handle any HTTP request + } else if (trigger.type === "schedule") { + warnOnce("Miniflare does not support CRON triggers yet, ignoring..."); + } else if (trigger.type === "queue-consumer") { + queueConsumers[trigger.name] = { + maxBatchSize: trigger.maxBatchSize, + maxBatchTimeout: trigger.maxBatchTimeout, + maxRetries: trigger.maxRetries, + deadLetterQueue: trigger.deadLetterQueue, + }; + } else { + const _exhaustive: never = trigger; + } + } + return { queueConsumers }; +} + +type PickTemplate = { + [P in keyof T & K]: T[P]; +}; +type PersistOptions = Required>; +function buildPersistOptions( + config: StartDevWorkerOptions +): PersistOptions | undefined { + const persist = config.dev?.persist ?? false; + if (persist === false) return; + const persistTo = persist === true ? undefined : persist.path; + const configPath = config.config?.path; + const localPersistencePath = getLocalPersistencePath(persistTo, configPath); + const v3Path = path.join(localPersistencePath, "v3"); + return { + cachePersist: path.join(v3Path, "cache"), + durableObjectsPersist: path.join(v3Path, "do"), + kvPersist: path.join(v3Path, "kv"), + r2Persist: path.join(v3Path, "r2"), + d1Persist: path.join(v3Path, "d1"), + }; +} + +function buildSitesOptions(config: StartDevWorkerOptions) { + if (config.site !== undefined) { + return { + sitePath: config.site.path, + siteInclude: config.site.include, + siteExclude: config.site.exclude, + }; + } +} + +function buildMiniflareOptions( + log: Log, + event: BundleCompleteEvent +): MiniflareOptions { + const sourceOptions = buildSourceOptions(event.bundle); + const { bindingOptions, externalDurableObjectWorker } = + buildBindingOptions(event); + const triggerOptions = buildTriggerOptions(event.config); + const sitesOptions = buildSitesOptions(event.config); + const persistOptions = buildPersistOptions(event.config); + + return { + host: "127.0.0.1", + inspectorPort: 0, + // upstream, + + log, + verbose: logger.loggerLevel === "debug", + + ...persistOptions, + workers: [ + { + name: getName(event.config), + compatibilityDate: event.config.compatibilityDate, + compatibilityFlags: event.config.compatibilityFlags, + + ...sourceOptions, + ...bindingOptions, + ...triggerOptions, + ...sitesOptions, + }, + externalDurableObjectWorker, + ], + }; +} export class LocalRuntimeController extends RuntimeController { // ****************** // Event Handlers // ****************** + #log = buildLog(); + #currentBundleId = 0; + + #mutex = new Mutex(); + #mf?: Miniflare; + onBundleStart(_: BundleStartEvent) { - notImplemented(this.onBundleStart.name, this.constructor.name); + // Ignored in local runtime + } + + async #onBundleComplete(data: BundleCompleteEvent, id: number) { + try { + const options = buildMiniflareOptions(this.#log, data); + if (this.#mf === undefined) { + this.#mf = new Miniflare(options); + } else { + await this.#mf.setOptions(options); + } + // All asynchronous `Miniflare` methods will wait for all `setOptions()` + // calls to complete before resolving. To ensure we get the `url` and + // `inspectorUrl` for this set of `options`, we protect `#mf` with a mutex, + // so only update can happen at a time. + const userWorkerUrl = await this.#mf.ready; + const userWorkerInspectorUrl = await this.#mf.getInspectorURL(); + // If we received a new `bundleComplete` event before we were able to + // dispatch a `reloadComplete` for this bundle, ignore this bundle. + if (id !== this.#currentBundleId) return; + this.emitReloadCompleteEvent({ + type: "reloadComplete", + config: data.config, + bundle: data.bundle, + proxyData: { + userWorkerUrl: { + protocol: userWorkerUrl.protocol, + hostname: userWorkerUrl.hostname, + port: userWorkerUrl.port, + }, + userWorkerInspectorUrl: { + protocol: userWorkerInspectorUrl.protocol, + hostname: userWorkerInspectorUrl.hostname, + port: userWorkerInspectorUrl.port, + pathname: `/core:user:${getName(data.config)}`, + }, + userWorkerInnerUrlOverrides: { + protocol: data.config?.dev?.urlOverrides?.secure + ? "https:" + : "http:", + hostname: data.config?.dev?.urlOverrides?.hostname, + }, + headers: {}, + liveReload: data.config.dev?.liveReload, + proxyLogsToController: data.bundle.type === "service-worker", + }, + }); + } catch (error) { + this.emitErrorEvent({ + type: "error", + reason: "Error reloading local server", + cause: castErrorCause(error), + source: "LocalRuntimeController", + }); + } } - onBundleComplete(_: BundleCompleteEvent) { - notImplemented(this.onBundleComplete.name, this.constructor.name); + onBundleComplete(data: BundleCompleteEvent) { + const id = ++this.#currentBundleId; + this.emitReloadStartEvent({ + type: "reloadStart", + config: data.config, + bundle: data.bundle, + }); + void this.#mutex.runWith(() => this.#onBundleComplete(data, id)); } onPreviewTokenExpired(_: PreviewTokenExpiredEvent): void { - // ignore in local runtime + // Ignored in local runtime } + #teardown = async (): Promise => { + await this.#mf?.dispose(); + this.#mf = undefined; + }; async teardown() { - notImplemented(this.teardown.name, this.constructor.name); + return this.#mutex.runWith(this.#teardown); } // ********************* diff --git a/packages/wrangler/src/deployment-bundle/source-url.ts b/packages/wrangler/src/deployment-bundle/source-url.ts index 98a146264e5b..109b143423b7 100644 --- a/packages/wrangler/src/deployment-bundle/source-url.ts +++ b/packages/wrangler/src/deployment-bundle/source-url.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import { pathToFileURL } from "url"; import type { CfModule } from "./worker"; -function withSourceURL(source: string, sourcePath: string) { +export function withSourceURL(source: string, sourcePath: string) { return `${source}\n//# sourceURL=${pathToFileURL(sourcePath)}`; }