diff --git a/.changeset/shaggy-owls-happen.md b/.changeset/shaggy-owls-happen.md new file mode 100644 index 00000000000..137abb7ac39 --- /dev/null +++ b/.changeset/shaggy-owls-happen.md @@ -0,0 +1,6 @@ +--- +"@smithy/eventstream-serde-universal": patch +"@smithy/eventstream-codec": patch +--- + +merge initial-response event into operation output diff --git a/.gitignore b/.gitignore index 318dc60a888..2e9f855bd0d 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ smithy-typescript-integ-tests/yarn.lock # Issue https://github.com/awslabs/smithy-typescript/issues/425 smithy-typescript-codegen/bin/ +smithy-typescript-codegen-test/bin/ smithy-typescript-ssdk-codegen-test-utils/bin/ smithy-typescript-codegen-test/example-weather-customizations/bin/ diff --git a/packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts b/packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts index 41be8bac272..bad12c2edbc 100644 --- a/packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts +++ b/packages/eventstream-codec/src/SmithyMessageDecoderStream.spec.ts @@ -35,4 +35,29 @@ describe("SmithyMessageDecoderStream", () => { expect(messages[0]).toEqual("first"); expect(messages[1]).toEqual("second"); }); + + it("is bufferable", async () => { + const stream = new SmithyMessageDecoderStream({ + deserializer: (_) => _ as any, + messageStream: [1, 2, 3, 4, 5] as any, + }); + + stream.push(10); + stream.unshift(9); + + const it = stream[Symbol.asyncIterator](); + + expect(await it.next()).toEqual({ value: 9, done: false }); + expect(await it.next()).toEqual({ value: 10, done: false }); + expect(await it.next()).toEqual({ value: 1, done: false }); + expect(await it.next()).toEqual({ value: 2, done: false }); + + stream.push(11); + expect(await it.next()).toEqual({ value: 11, done: false }); + + expect(await it.next()).toEqual({ value: 3, done: false }); + expect(await it.next()).toEqual({ value: 4, done: false }); + expect(await it.next()).toEqual({ value: 5, done: false }); + expect(await it.next()).toEqual({ value: undefined, done: true }); + }); }); diff --git a/packages/eventstream-codec/src/SmithyMessageDecoderStream.ts b/packages/eventstream-codec/src/SmithyMessageDecoderStream.ts index 18e86f65c44..9c66d65310c 100644 --- a/packages/eventstream-codec/src/SmithyMessageDecoderStream.ts +++ b/packages/eventstream-codec/src/SmithyMessageDecoderStream.ts @@ -12,14 +12,26 @@ export interface SmithyMessageDecoderStreamOptions { * @internal */ export class SmithyMessageDecoderStream implements AsyncIterable { + private buffer = [] as T[]; constructor(private readonly options: SmithyMessageDecoderStreamOptions) {} [Symbol.asyncIterator](): AsyncIterator { return this.asyncIterator(); } + public unshift(item: T) { + this.buffer.unshift(item); + } + + public push(item: T) { + this.buffer.push(item); + } + private async *asyncIterator() { for await (const message of this.options.messageStream) { + while (this.buffer.length > 0) { + yield this.buffer.shift() as Awaited; + } const deserialized = await this.options.deserializer(message); if (deserialized === undefined) continue; yield deserialized; diff --git a/packages/eventstream-serde-universal/src/bufferInitialResponse.ts b/packages/eventstream-serde-universal/src/bufferInitialResponse.ts new file mode 100644 index 00000000000..a6d365588bc --- /dev/null +++ b/packages/eventstream-serde-universal/src/bufferInitialResponse.ts @@ -0,0 +1,40 @@ +import { EventStreamSerdeContext } from "@smithy/types"; + +/** + * @internal + * + * Attempts to merge the first event if it is the initial-response event type + * into the operation output. + * + * If it is not the initial-response type, the value is restacked into the + * iterator and the remaining iterations are pass-through to the + * event stream. + */ +export async function bufferInitialResponse( + field: string, + deser: Function, + output: any, + context: EventStreamSerdeContext +) { + const contents = { [field]: null as any }; + const controller = deser(output.body, context) as any; + const it = controller[Symbol.asyncIterator]() as any; + + const initialResponse = (await it.next()) ?? {}; + + if ("initial-response" in (initialResponse.value || {})) { + Object.assign(contents, initialResponse.value["initial-response"]); + } else { + controller.push(initialResponse.value); + } + + contents[field] = { + async *[Symbol.asyncIterator]() { + while (!it.done) { + yield (await it.next()).value; + } + }, + }; + + return contents; +} diff --git a/packages/eventstream-serde-universal/src/index.ts b/packages/eventstream-serde-universal/src/index.ts index 9f8e9f7e33a..79b20e25e02 100644 --- a/packages/eventstream-serde-universal/src/index.ts +++ b/packages/eventstream-serde-universal/src/index.ts @@ -6,3 +6,7 @@ export * from "./EventStreamMarshaller"; * @internal */ export * from "./provider"; +/** + * @internal + */ +export * from "./bufferInitialResponse"; diff --git a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/TypeScriptDependency.java b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/TypeScriptDependency.java index 61e1f9d85e7..83c224003ea 100644 --- a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/TypeScriptDependency.java +++ b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/TypeScriptDependency.java @@ -101,6 +101,7 @@ public enum TypeScriptDependency implements Dependency { false), AWS_SDK_EVENTSTREAM_SERDE_NODE("dependencies", "@smithy/eventstream-serde-node", false), AWS_SDK_EVENTSTREAM_SERDE_BROWSER("dependencies", "@smithy/eventstream-serde-browser", false), + AWS_SDK_EVENTSTREAM_SERDE_UNIVERSAL("dependencies", "@smithy/eventstream-serde-universal", false), // Conditionally added if a big decimal shape is found in a model. BIG_JS("dependencies", "big.js", "^6.0.0", false), diff --git a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/EventStreamGenerator.java b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/EventStreamGenerator.java index 9d80dd5f327..804e894225f 100644 --- a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/EventStreamGenerator.java +++ b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/EventStreamGenerator.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.stream.Collectors; import software.amazon.smithy.codegen.core.CodegenException; @@ -38,7 +39,9 @@ import software.amazon.smithy.model.traits.ErrorTrait; import software.amazon.smithy.model.traits.EventHeaderTrait; import software.amazon.smithy.model.traits.EventPayloadTrait; +import software.amazon.smithy.model.traits.JsonNameTrait; import software.amazon.smithy.model.traits.StreamingTrait; +import software.amazon.smithy.model.traits.StringTrait; import software.amazon.smithy.typescript.codegen.TypeScriptDependency; import software.amazon.smithy.typescript.codegen.TypeScriptWriter; import software.amazon.smithy.typescript.codegen.integration.ProtocolGenerator.GenerationContext; @@ -110,7 +113,7 @@ public void generateEventStreamSerializers( Set eventShapes = eventsUnion.members().stream() .map(member -> model.expectShape(member.getTarget()).asStructureShape().get()) .collect(Collectors.toSet()); - eventShapes.forEach(eventShapesToMarshall::add); + eventShapesToMarshall.addAll(eventShapes); } } @@ -153,22 +156,21 @@ public void generateEventStreamDeserializers( TopDownIndex topDownIndex = TopDownIndex.of(model); Set operations = topDownIndex.getContainedOperations(service); - TreeSet eventUnionsToDeserialize = new TreeSet<>(); + TreeMap eventUnionsToDeserialize = new TreeMap<>(); TreeSet eventShapesToUnmarshall = new TreeSet<>(); for (OperationShape operation : operations) { if (hasEventStreamOutput(context, operation)) { UnionShape eventsUnion = getEventStreamOutputShape(context, operation); - eventUnionsToDeserialize.add(eventsUnion); + eventUnionsToDeserialize.put(eventsUnion, operation); Set eventShapes = eventsUnion.members().stream() .map(member -> model.expectShape(member.getTarget()).asStructureShape().get()) .collect(Collectors.toSet()); - eventShapes.forEach(eventShapesToUnmarshall::add); + eventShapesToUnmarshall.addAll(eventShapes); } } - eventUnionsToDeserialize.forEach(eventsUnion -> { - generateEventStreamDeserializer(context, eventsUnion); - }); + eventUnionsToDeserialize.forEach((key, value) -> generateEventStreamDeserializer(context, key, value)); + eventShapesToUnmarshall.forEach(event -> { generateEventUnmarshaller( context, @@ -393,7 +395,9 @@ private void writeEventBody( } } - private void generateEventStreamDeserializer(GenerationContext context, UnionShape eventsUnion) { + private void generateEventStreamDeserializer(GenerationContext context, + UnionShape eventsUnion, + OperationShape operation) { String methodName = getDeserFunctionName(context, eventsUnion); String methodLongName = ProtocolGenerator.getDeserFunctionName(getSymbol(context, eventsUnion), context.getProtocolName()); @@ -410,6 +414,7 @@ private void generateEventStreamDeserializer(GenerationContext context, UnionSha writer.openBlock("return context.eventStreamMarshaller.deserialize(", ");", () -> { writer.write("output,"); writer.openBlock("async event => {", "}", () -> { + // regular union members. eventsUnion.getAllMembers().forEach((name, member) -> { StructureShape event = model.expectShape(member.getTarget(), StructureShape.class); writer.openBlock("if (event[$S] != null) {", "}", name, () -> { @@ -419,6 +424,26 @@ private void generateEventStreamDeserializer(GenerationContext context, UnionSha }); }); }); + // implicit initial-response union member. + writer.openBlock("if (event['initial-response'] != null) {", "}", () -> { + writer.write("const ir = await parseBody(event['initial-response'].body, context)"); + writer.addImport("map", "", TypeScriptDependency.AWS_SMITHY_CLIENT); + writer.openBlock("return { 'initial-response': map({", "}) } as any", () -> { + context.getModel().expectShape(operation.getOutputShape()).members().forEach(member -> { + boolean isStreaming = model.expectShape(member.getTarget()) + .hasTrait(StreamingTrait.class); + if (!isStreaming) { + writer.write("$L: [, ir.$L]", + member.getTrait(JsonNameTrait.class) + .map(StringTrait::getValue) + .orElse(member.getMemberName()), + member.getMemberName() + ); + } + }); + }); + }); + writer.write("return {$$unknown: output};"); }); }); diff --git a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/HttpRpcProtocolGenerator.java b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/HttpRpcProtocolGenerator.java index bb63d58f91f..dd126e65a9f 100644 --- a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/HttpRpcProtocolGenerator.java +++ b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/HttpRpcProtocolGenerator.java @@ -29,6 +29,7 @@ import software.amazon.smithy.model.shapes.Shape; import software.amazon.smithy.model.shapes.StructureShape; import software.amazon.smithy.model.traits.EndpointTrait; +import software.amazon.smithy.model.traits.StreamingTrait; import software.amazon.smithy.typescript.codegen.ApplicationProtocol; import software.amazon.smithy.typescript.codegen.CodegenUtils; import software.amazon.smithy.typescript.codegen.TypeScriptDependency; @@ -528,11 +529,23 @@ private void readResponseBody(GenerationContext context, OperationShape operatio StructureShape outputShape = context.getModel().expectShape(outputId).asStructureShape().get(); if (EventStreamGenerator.hasEventStreamOutput(context, operation)) { // There must only one eventstream member in response structure. - MemberShape member = outputShape.members().stream().collect(Collectors.toList()).get(0); + MemberShape member = outputShape.members().stream().filter( + (MemberShape m) -> context.getModel() + .expectShape(m.getTarget()) + .hasTrait(StreamingTrait.class) + ).findFirst().orElseThrow(); Shape target = context.getModel().expectShape(member.getTarget()); Symbol targetSymbol = context.getSymbolProvider().toSymbol(target); - writer.write("const contents = { $L: $L(output.body, context) };", member.getMemberName(), - ProtocolGenerator.getDeserFunctionShortName(targetSymbol)); + + writer.addImport( + "bufferInitialResponse", null, + TypeScriptDependency.AWS_SDK_EVENTSTREAM_SERDE_UNIVERSAL + ); + writer.write( + "const contents = await bufferInitialResponse($S, $L, output, context);", + member.getMemberName(), + ProtocolGenerator.getDeserFunctionShortName(targetSymbol) + ); } else { // We only need to load the body and prepare a contents object if there is a response. writer.write("const data: any = await parseBody(output.body, context)");