From 4b2892465d03429a89254a9c0bdf4d69c5bfc22d Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 13 Dec 2023 14:45:12 +1300 Subject: [PATCH] optimization for Effect handlers (#337) --- packages/platform-browser/test/Worker.test.ts | 3 +- .../platform/src/internal/workerRunner.ts | 41 ++++++++++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/packages/platform-browser/test/Worker.test.ts b/packages/platform-browser/test/Worker.test.ts index 572ce63b..2a3925ba 100644 --- a/packages/platform-browser/test/Worker.test.ts +++ b/packages/platform-browser/test/Worker.test.ts @@ -31,7 +31,8 @@ describe("Worker", () => { spawn: () => new globalThis.SharedWorker(new URL("./fixtures/serializedWorker.ts", import.meta.url)), size: 1 })) - const user = yield* _(pool.executeEffect(new GetUserById({ id: 123 }))) + let user = yield* _(pool.executeEffect(new GetUserById({ id: 123 }))) + user = yield* _(pool.executeEffect(new GetUserById({ id: 123 }))) assert.deepStrictEqual(user, new User({ id: 123, name: "test" })) const people = yield* _(pool.execute(new GetPersonById({ id: 123 })), Stream.runCollect) assert.deepStrictEqual(Chunk.toReadonlyArray(people), [ diff --git a/packages/platform/src/internal/workerRunner.ts b/packages/platform/src/internal/workerRunner.ts index 561af6ba..44aebd16 100644 --- a/packages/platform/src/internal/workerRunner.ts +++ b/packages/platform/src/internal/workerRunner.ts @@ -6,6 +6,7 @@ import * as Effect from "effect/Effect" import * as Either from "effect/Either" import * as Fiber from "effect/Fiber" import { pipe } from "effect/Function" +import * as Predicate from "effect/Predicate" import * as Queue from "effect/Queue" import type * as Scope from "effect/Scope" import * as Stream from "effect/Stream" @@ -116,23 +117,43 @@ export const makeSerialized = < never > => { const parseRequest = Schema.decode(schema) - return make((request: I) => - pipe( - parseRequest(request), - Stream.flatMap((request: A) => { + const effectTags = new Set() + return make((request: I) => { + if (Predicate.hasProperty(request, "_tag") && effectTags.has(request._tag as string)) { + return Effect.flatMap(parseRequest(request), (request: A) => { const handler = - (handlers as unknown as Record Stream.Stream>)[request._tag] + (handlers as unknown as Record Effect.Effect>)[request._tag] if (!handler) { - return Stream.dieMessage(`No handler for ${request._tag}`) + return Effect.dieMessage(`No handler for ${request._tag}`) } const encodeSuccess = Schema.encode(Serializable.successSchema(request as any)) return pipe( - handler(request), - Stream.catchAll((error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail)), - Stream.mapEffect(encodeSuccess) + Effect.matchEffect(handler(request), { + onFailure: (error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail), + onSuccess: encodeSuccess + }) ) }) - ), { + } + + return Stream.flatMap(parseRequest(request), (request: A) => { + const handler = + (handlers as unknown as Record Stream.Stream>)[request._tag] + if (!handler) { + return Stream.dieMessage(`No handler for ${request._tag}`) + } + const encodeSuccess = Schema.encode(Serializable.successSchema(request as any)) + const stream = handler(request) + if (Effect.isEffect(stream)) { + effectTags.add(request._tag) + } + return pipe( + stream, + Stream.catchAll((error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail)), + Stream.mapEffect(encodeSuccess) + ) + }) + }, { transfers(message) { return Transferable.get(message) }