Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
optimization for Effect handlers (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Dec 13, 2023
1 parent 4f0166e commit 4b28924
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
3 changes: 2 additions & 1 deletion packages/platform-browser/test/Worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ describe("Worker", () => {
spawn: () => new globalThis.SharedWorker(new URL("./fixtures/serializedWorker.ts", import.meta.url)),
size: 1
}))
const user = yield* _(pool.executeEffect(new GetUserById({ id: 123 })))
let user = yield* _(pool.executeEffect(new GetUserById({ id: 123 })))
user = yield* _(pool.executeEffect(new GetUserById({ id: 123 })))
assert.deepStrictEqual(user, new User({ id: 123, name: "test" }))
const people = yield* _(pool.execute(new GetPersonById({ id: 123 })), Stream.runCollect)
assert.deepStrictEqual(Chunk.toReadonlyArray(people), [
Expand Down
41 changes: 31 additions & 10 deletions packages/platform/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as Effect from "effect/Effect"
import * as Either from "effect/Either"
import * as Fiber from "effect/Fiber"
import { pipe } from "effect/Function"
import * as Predicate from "effect/Predicate"
import * as Queue from "effect/Queue"
import type * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
Expand Down Expand Up @@ -116,23 +117,43 @@ export const makeSerialized = <
never
> => {
const parseRequest = Schema.decode(schema)
return make((request: I) =>
pipe(
parseRequest(request),
Stream.flatMap((request: A) => {
const effectTags = new Set<string>()
return make((request: I) => {
if (Predicate.hasProperty(request, "_tag") && effectTags.has(request._tag as string)) {
return Effect.flatMap(parseRequest(request), (request: A) => {
const handler =
(handlers as unknown as Record<string, (req: unknown) => Stream.Stream<never, any, any>>)[request._tag]
(handlers as unknown as Record<string, (req: unknown) => Effect.Effect<never, any, any>>)[request._tag]
if (!handler) {
return Stream.dieMessage(`No handler for ${request._tag}`)
return Effect.dieMessage(`No handler for ${request._tag}`)
}
const encodeSuccess = Schema.encode(Serializable.successSchema(request as any))
return pipe(
handler(request),
Stream.catchAll((error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail)),
Stream.mapEffect(encodeSuccess)
Effect.matchEffect(handler(request), {
onFailure: (error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail),
onSuccess: encodeSuccess
})
)
})
), {
}

return Stream.flatMap(parseRequest(request), (request: A) => {
const handler =
(handlers as unknown as Record<string, (req: unknown) => Stream.Stream<never, any, any>>)[request._tag]
if (!handler) {
return Stream.dieMessage(`No handler for ${request._tag}`)
}
const encodeSuccess = Schema.encode(Serializable.successSchema(request as any))
const stream = handler(request)
if (Effect.isEffect(stream)) {
effectTags.add(request._tag)
}
return pipe(
stream,
Stream.catchAll((error) => Effect.flatMap(Serializable.serializeFailure(request as any, error), Effect.fail)),
Stream.mapEffect(encodeSuccess)
)
})
}, {
transfers(message) {
return Transferable.get(message)
}
Expand Down

0 comments on commit 4b28924

Please sign in to comment.