Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wrangler): add local DO support with getPlatformProxy #7292

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 72 additions & 124 deletions packages/wrangler/src/api/integrations/platform/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import { kCurrentWorker, Miniflare } from "miniflare";
import { kCurrentWorker } from "miniflare";
import { getAssetsOptions } from "../../../assets";
import { readConfig } from "../../../config";
import { DEFAULT_MODULE_RULES } from "../../../deployment-bundle/rules";
import { getBindings } from "../../../dev";
import { getBoundRegisteredWorkers } from "../../../dev-registry";
import { getClassNamesWhichUseSQLite } from "../../../dev/class-names-sqlite";
import { getVarsForDev } from "../../../dev/dev-vars";
import {
buildAssetOptions,
buildMiniflareBindingOptions,
buildSitesOptions,
} from "../../../dev/miniflare";
import { run } from "../../../experimental-flags";
import { getLegacyAssetPaths, getSiteAssetPaths } from "../../../sites";
import { startWorker } from "../../startDevWorker";
import { CacheStorage } from "./caches";
import { ExecutionContext } from "./executionContext";
import { getServiceBindings } from "./services";
import type { Config, RawConfig, RawEnvironment } from "../../../config";
import type { StartDevWorkerInput, Worker } from "../../startDevWorker";
import type { IncomingRequestCfProperties } from "@cloudflare/workers-types/experimental";
import type { MiniflareOptions, ModuleRule, WorkerOptions } from "miniflare";
import type { ModuleRule, WorkerOptions } from "miniflare";

export { readConfig as unstable_readConfig };
export type {
Expand Down Expand Up @@ -87,6 +86,11 @@ export type PlatformProxy<
dispose: () => Promise<void>;
};

/**
* The running worker instances with their reference count
*/
const workers = new Map<Worker, number>();

/**
* By reading from a Wrangler configuration file this function generates proxy objects that can be
* used to simulate the interaction with the Cloudflare platform during local development
Expand All @@ -101,132 +105,76 @@ export async function getPlatformProxy<
>(
options: GetPlatformProxyOptions = {}
): Promise<PlatformProxy<Env, CfProperties>> {
const env = options.environment;
// TODO: Allow skipping custom build

const rawConfig = readConfig({
config: options.configPath,
env,
});

const miniflareOptions = await run(
return await run(
{
FILE_BASED_REGISTRY: Boolean(options.experimentalRegistry ?? true),
MULTIWORKER: false,
RESOURCES_PROVISION: false,
},
() => getMiniflareOptionsFromConfig(rawConfig, env, options)
);

const mf = new Miniflare({
script: "",
modules: true,
...(miniflareOptions as Record<string, unknown>),
});

const bindings: Env = await mf.getBindings();

const vars = getVarsForDev(rawConfig, env);

const cf = await mf.getCf();
deepFreeze(cf);

return {
env: {
...vars,
...bindings,
},
cf: cf as CfProperties,
ctx: new ExecutionContext(),
caches: new CacheStorage(),
dispose: () => mf.dispose(),
};
}

async function getMiniflareOptionsFromConfig(
rawConfig: Config,
env: string | undefined,
options: GetPlatformProxyOptions
): Promise<Partial<MiniflareOptions>> {
const bindings = getBindings(rawConfig, env, true, {});

const workerDefinitions = await getBoundRegisteredWorkers({
name: rawConfig.name,
services: bindings.services,
durableObjects: rawConfig["durable_objects"],
});

const { bindingOptions, externalWorkers } = buildMiniflareBindingOptions({
name: undefined,
bindings,
workerDefinitions,
queueConsumers: undefined,
services: rawConfig.services,
serviceBindings: {},
migrations: rawConfig.migrations,
});

const persistOptions = getMiniflarePersistOptions(options.persist);

const serviceBindings = await getServiceBindings(bindings.services);

const miniflareOptions: MiniflareOptions = {
workers: [
{
script: "",
modules: true,
...bindingOptions,
serviceBindings: {
...serviceBindings,
...bindingOptions.serviceBindings,
async () => {
const input: StartDevWorkerInput = {
config: options.configPath,
env: options.environment,
dev: {
inspector: {
port: 0,
},
server: {
port: 0,
},
logLevel: "error",
liveReload: false,
persist:
typeof options.persist === "object"
? options.persist.path
: options.persist
? ".wrangler/state/v3"
: undefined,
},
},
...externalWorkers,
],
...persistOptions,
};

return miniflareOptions;
}

/**
* Get the persist option properties to pass to miniflare
*
* @param persist The user provided persistence option
* @returns an object containing the properties to pass to miniflare
*/
function getMiniflarePersistOptions(
persist: GetPlatformProxyOptions["persist"]
): Pick<
MiniflareOptions,
| "kvPersist"
| "durableObjectsPersist"
| "r2Persist"
| "d1Persist"
| "workflowsPersist"
> {
if (persist === false) {
// the user explicitly asked for no persistance
return {
kvPersist: false,
durableObjectsPersist: false,
r2Persist: false,
d1Persist: false,
workflowsPersist: false,
};
}

const defaultPersistPath = ".wrangler/state/v3";

const persistPath =
typeof persist === "object" ? persist.path : defaultPersistPath;

return {
kvPersist: `${persistPath}/kv`,
durableObjectsPersist: `${persistPath}/do`,
r2Persist: `${persistPath}/r2`,
d1Persist: `${persistPath}/d1`,
workflowsPersist: `${persistPath}/workflows`,
};
};

// Find an existing worker with the same input
let worker = Array.from(workers.keys()).find((w) => {
return JSON.stringify(w.input) === JSON.stringify(input);
});

// Start a new worker if none was found
if (!worker) {
worker = await startWorker(input);
}

// Update the reference count
workers.set(worker, (workers.get(worker) ?? 0) + 1);

const { env, cf } = await worker.getPlatformProxy();
deepFreeze(cf);

return {
env: env as Env,
cf: cf as CfProperties,
ctx: new ExecutionContext(),
caches: new CacheStorage(),
dispose: async () => {
const count = workers.get(worker);

if (count !== undefined) {
// Don't dispose the worker if it's still in use
if (count > 1) {
workers.set(worker, count - 1);
return;
}

// Remove the worker from the map before disposing it
workers.delete(worker);
}

await worker.dispose();
},
};
}
);
}

function deepFreeze<T extends Record<string | number | symbol, unknown>>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async function resolveDevConfig(
hostname: host ?? getInferredHost(routes, config.configPath),
},
liveReload: input.dev?.liveReload || false,
logLevel: input.dev?.logLevel,
testScheduled: input.dev?.testScheduled,
// absolute resolved path
persist: localPersistencePath,
Expand Down
27 changes: 27 additions & 0 deletions packages/wrangler/src/api/startDevWorker/DevEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ export class DevEnv extends EventEmitter {
async startWorker(options: StartDevWorkerInput): Promise<Worker> {
const worker = createWorkerObject(this);

if (options.dev?.logLevel) {
logger.loggerLevel = options.dev.logLevel;
}

await this.config.set(options);

return worker;
Expand Down Expand Up @@ -148,6 +152,10 @@ function createWorkerObject(devEnv: DevEnv): Worker {
assert(devEnv.config.latestConfig);
return devEnv.config.latestConfig;
},
get input() {
assert(devEnv.config.latestInput);
return devEnv.config.latestInput;
},
setConfig(config) {
return devEnv.config.set(config);
},
Expand Down Expand Up @@ -178,6 +186,25 @@ function createWorkerObject(devEnv: DevEnv): Worker {
const w = await proxyWorker.getWorker(this.config.name);
return w.scheduled(...args);
},
async getPlatformProxy() {
const local = devEnv.runtimes.find(
(ctrl) => ctrl instanceof LocalRuntimeController
);

if (!local) {
throw new Error("The platform proxy is only available in local mode");
}

const [env, cf] = await Promise.all([
local.getBindingsProxy(),
local.getCfProxy(),
]);

return {
env,
cf,
};
},
async dispose() {
await devEnv.teardown();
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { randomUUID } from "node:crypto";
import events from "node:events";
import { readFile } from "node:fs/promises";
import chalk from "chalk";
import { Miniflare, Mutex } from "miniflare";
Expand Down Expand Up @@ -141,6 +142,49 @@ export class LocalRuntimeController extends RuntimeController {
#mutex = new Mutex();
#mf?: Miniflare;

#bindings?: Record<string, unknown>;
#cf?: Record<string, unknown>;

async getMiniflareInstance() {
if (!this.#mf) {
await events.once(this, "reloadComplete");

if (!this.#mf) {
throw new Error("Miniflare instance is not available after reload");
}
}

return this.#mf;
}

async getBindingsProxy() {
// Initialize the bindings from the Miniflare instance, which will be updated on each reload
if (!this.#bindings) {
const mf = await this.getMiniflareInstance();
this.#bindings = await mf.getBindings();
}

return new Proxy({} as Record<string, unknown>, {
get: (_, prop, receiver) => {
return Reflect.get(this.#bindings ?? {}, prop, receiver);
},
});
}

async getCfProxy() {
// Initialize the cf properties from the Miniflare instance, which will be updated on each reload
if (!this.#cf) {
const mf = await this.getMiniflareInstance();
this.#cf = await mf.getCf();
}

return new Proxy({} as Record<string, unknown>, {
get: (_, prop, receiver) => {
return Reflect.get(this.#cf ?? {}, prop, receiver);
},
});
}

onBundleStart(_: BundleStartEvent) {
// Ignored in local runtime
}
Expand All @@ -161,6 +205,16 @@ export class LocalRuntimeController extends RuntimeController {
logger.log(chalk.dim("⎔ Reloading local server..."));

await this.#mf.setOptions(options);

// If the bindings were fetched, ensure they're up-to-date
if (this.#bindings) {
this.#bindings = await this.#mf.getBindings();
}

// If the cf properties were fetched, ensure they're up-to-date
if (this.#cf) {
this.#cf = await this.#mf.getCf();
}
}
// All asynchronous `Miniflare` methods will wait for all `setOptions()`
// calls to complete before resolving. To ensure we get the `url` and
Expand Down
5 changes: 5 additions & 0 deletions packages/wrangler/src/api/startDevWorker/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@ export interface Worker {
url: Promise<URL>;
inspectorUrl: Promise<URL>;
config: StartDevWorkerOptions;
input: StartDevWorkerInput;
setConfig: ConfigController["set"];
patchConfig: ConfigController["patch"];
fetch: DispatchFetch;
scheduled: MiniflareWorker["scheduled"];
queue: MiniflareWorker["queue"];
getPlatformProxy(): Promise<{
env: Record<string, unknown>;
cf: Record<string, unknown>;
}>;
dispose(): Promise<void>;
}

Expand Down
Loading