From f4e0bd9a40c75c7839ec7cb879a467eb1ee34929 Mon Sep 17 00:00:00 2001 From: George Fu Date: Thu, 17 Oct 2024 13:37:02 -0400 Subject: [PATCH] feat(util-stream): create checksum stream adapters (#1409) * feat(util-stream): create checksum stream adapters * add bundler metadata * move TransformStream checksum to flush event * improve uniformity of node/web checksumstream api * alphabetization * use class inheritance * inheritance issue in jest * add karma test for checksum stream * separate files --- .changeset/red-cameras-repair.md | 5 + packages/util-stream/karma.conf.js | 10 +- packages/util-stream/package.json | 3 + .../src/checksum/ChecksumStream.browser.ts | 39 +++++ .../src/checksum/ChecksumStream.ts | 118 +++++++++++++++ .../createChecksumStream.browser.spec.ts | 89 ++++++++++++ .../checksum/createChecksumStream.browser.ts | 82 +++++++++++ .../src/checksum/createChecksumStream.spec.ts | 136 ++++++++++++++++++ .../src/checksum/createChecksumStream.ts | 22 +++ packages/util-stream/src/index.ts | 2 + 10 files changed, 505 insertions(+), 1 deletion(-) create mode 100644 .changeset/red-cameras-repair.md create mode 100644 packages/util-stream/src/checksum/ChecksumStream.browser.ts create mode 100644 packages/util-stream/src/checksum/ChecksumStream.ts create mode 100644 packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts create mode 100644 packages/util-stream/src/checksum/createChecksumStream.browser.ts create mode 100644 packages/util-stream/src/checksum/createChecksumStream.spec.ts create mode 100644 packages/util-stream/src/checksum/createChecksumStream.ts diff --git a/.changeset/red-cameras-repair.md b/.changeset/red-cameras-repair.md new file mode 100644 index 00000000000..00f6218e7cf --- /dev/null +++ b/.changeset/red-cameras-repair.md @@ -0,0 +1,5 @@ +--- +"@smithy/util-stream": minor +--- + +create checksum stream adapter diff --git a/packages/util-stream/karma.conf.js b/packages/util-stream/karma.conf.js index f652e2493e6..b2ae368b568 100644 --- a/packages/util-stream/karma.conf.js +++ b/packages/util-stream/karma.conf.js @@ -3,7 +3,15 @@ module.exports = function (config) { config.set({ frameworks: ["jasmine", "karma-typescript"], - files: ["src/getAwsChunkedEncodingStream.browser.ts", "src/getAwsChunkedEncodingStream.browser.spec.ts"], + files: [ + "src/checksum/createChecksumStream.browser.spec.ts", + "src/checksum/createChecksumStream.browser.ts", + "src/checksum/ChecksumStream.browser.ts", + "src/getAwsChunkedEncodingStream.browser.spec.ts", + "src/getAwsChunkedEncodingStream.browser.ts", + "src/headStream.browser.ts", + "src/stream-type-check.ts", + ], exclude: ["**/*.d.ts"], preprocessors: { "**/*.ts": "karma-typescript", diff --git a/packages/util-stream/package.json b/packages/util-stream/package.json index f0f62d154c2..e4ecd15c130 100644 --- a/packages/util-stream/package.json +++ b/packages/util-stream/package.json @@ -55,16 +55,19 @@ "dist-*/**" ], "browser": { + "./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser", "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/headStream": "./dist-es/headStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", "./dist-es/splitStream": "./dist-es/splitStream.browser" }, "react-native": { + "./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser", "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", "./dist-es/headStream": "./dist-es/headStream.browser", "./dist-es/splitStream": "./dist-es/splitStream.browser", + "./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser", "./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser", "./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser", "./dist-cjs/headStream": "./dist-cjs/headStream.browser", diff --git a/packages/util-stream/src/checksum/ChecksumStream.browser.ts b/packages/util-stream/src/checksum/ChecksumStream.browser.ts new file mode 100644 index 00000000000..18628dbc42c --- /dev/null +++ b/packages/util-stream/src/checksum/ChecksumStream.browser.ts @@ -0,0 +1,39 @@ +import { Checksum, Encoder } from "@smithy/types"; + +/** + * @internal + */ +export interface ChecksumStreamInit { + /** + * Base64 value of the expected checksum. + */ + expectedChecksum: string; + /** + * For error messaging, the location from which the checksum value was read. + */ + checksumSourceLocation: string; + /** + * The checksum calculator. + */ + checksum: Checksum; + /** + * The stream to be checked. + */ + source: ReadableStream; + + /** + * Optional base 64 encoder if calling from a request context. + */ + base64Encoder?: Encoder; +} + +const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {}; + +/** + * This stub exists so that the readable returned by createChecksumStream + * identifies as "ChecksumStream" in alignment with the Node.js + * implementation. + * + * @extends ReadableStream + */ +export class ChecksumStream extends (ReadableStreamRef as any) {} diff --git a/packages/util-stream/src/checksum/ChecksumStream.ts b/packages/util-stream/src/checksum/ChecksumStream.ts new file mode 100644 index 00000000000..769d0076d5e --- /dev/null +++ b/packages/util-stream/src/checksum/ChecksumStream.ts @@ -0,0 +1,118 @@ +import { Checksum, Encoder } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; +import { Duplex, Readable } from "stream"; + +/** + * @internal + */ +export interface ChecksumStreamInit { + /** + * Base64 value of the expected checksum. + */ + expectedChecksum: string; + /** + * For error messaging, the location from which the checksum value was read. + */ + checksumSourceLocation: string; + /** + * The checksum calculator. + */ + checksum: Checksum; + /** + * The stream to be checked. + */ + source: T; + + /** + * Optional base 64 encoder if calling from a request context. + */ + base64Encoder?: Encoder; +} + +/** + * @internal + * + * Wrapper for throwing checksum errors for streams without + * buffering the stream. + * + */ +export class ChecksumStream extends Duplex { + private expectedChecksum: string; + private checksumSourceLocation: string; + private checksum: Checksum; + private source?: Readable; + private base64Encoder: Encoder; + + public constructor({ + expectedChecksum, + checksum, + source, + checksumSourceLocation, + base64Encoder, + }: ChecksumStreamInit) { + super(); + if (typeof (source as Readable).pipe === "function") { + this.source = source as Readable; + } else { + throw new Error( + `@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.` + ); + } + + this.base64Encoder = base64Encoder ?? toBase64; + this.expectedChecksum = expectedChecksum; + this.checksum = checksum; + this.checksumSourceLocation = checksumSourceLocation; + + // connect this stream to the end of the source stream. + this.source.pipe(this); + } + + /** + * @internal do not call this directly. + */ + public _read( + // eslint-disable-next-line @typescript-eslint/no-unused-vars + size: number + ): void {} + + /** + * @internal do not call this directly. + * + * When the upstream source flows data to this stream, + * calculate a step update of the checksum. + */ + public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void { + try { + this.checksum.update(chunk); + this.push(chunk); + } catch (e: unknown) { + return callback(e as Error); + } + return callback(); + } + + /** + * @internal do not call this directly. + * + * When the upstream source finishes, perform the checksum comparison. + */ + public async _final(callback: (err?: Error) => void): Promise { + try { + const digest: Uint8Array = await this.checksum.digest(); + const received = this.base64Encoder(digest); + if (this.expectedChecksum !== received) { + return callback( + new Error( + `Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` + + ` in response header "${this.checksumSourceLocation}".` + ) + ); + } + } catch (e: unknown) { + return callback(e as Error); + } + this.push(null); + return callback(); + } +} diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts new file mode 100644 index 00000000000..0f90a5eb2f0 --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts @@ -0,0 +1,89 @@ +import { Checksum } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; +import { toUtf8 } from "@smithy/util-utf8"; + +import { headStream } from "../headStream.browser"; +import { ChecksumStream as ChecksumStreamWeb } from "./ChecksumStream.browser"; +import { createChecksumStream } from "./createChecksumStream.browser"; + +describe("Checksum streams", () => { + /** + * Hash "algorithm" that appends all data together. + */ + class Appender implements Checksum { + public hash = ""; + async digest(): Promise { + return Buffer.from(this.hash); + } + reset(): void { + throw new Error("Function not implemented."); + } + update(chunk: Uint8Array): void { + this.hash += toUtf8(chunk); + } + } + + const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0))); + + const canonicalUtf8 = toUtf8(canonicalData); + const canonicalBase64 = toBase64(canonicalUtf8); + + describe(createChecksumStream.name + " webstreams API", () => { + if (typeof ReadableStream !== "function") { + // test not applicable to Node.js 16. + return; + } + + const makeStream = () => { + return new ReadableStream({ + start(controller) { + canonicalData.forEach((byte) => { + controller.enqueue(new Uint8Array([byte])); + }); + controller.close(); + }, + }); + }; + + it("should extend a ReadableStream", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: canonicalBase64, + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + expect(checksumStream).toBeInstanceOf(ReadableStream); + expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb); + + const collected = toUtf8(await headStream(checksumStream, Infinity)); + expect(collected).toEqual(canonicalUtf8); + expect(stream.locked).toEqual(true); + + // expectation is that it is resolved. + expect(await checksumStream.getReader().closed); + }); + + it("should throw during stream read if the checksum does not match", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: "different-expected-checksum", + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + try { + toUtf8(await headStream(checksumStream, Infinity)); + throw new Error("stream was read successfully"); + } catch (e: unknown) { + expect(String(e)).toEqual( + `Error: Checksum mismatch: expected "different-expected-checksum" but` + + ` received "${canonicalBase64}"` + + ` in response header "my-header".` + ); + } + }); + }); +}); diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.ts new file mode 100644 index 00000000000..6ca56bab956 --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.ts @@ -0,0 +1,82 @@ +import { toBase64 } from "@smithy/util-base64"; + +import { isReadableStream } from "../stream-type-check"; +import { ChecksumStream, ChecksumStreamInit } from "./ChecksumStream.browser"; + +/** + * @internal + * Alias prevents compiler from turning + * ReadableStream into ReadableStream, which is incompatible + * with the NodeJS.ReadableStream global type. + */ +export type ReadableStreamType = ReadableStream; + +/** + * This is a local copy of + * https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController + * in case users do not have this type. + */ +interface TransformStreamDefaultController { + enqueue(chunk: any): void; + error(error: unknown): void; + terminate(): void; +} + +/** + * @internal + * + * Creates a stream adapter for throwing checksum errors for streams without + * buffering the stream. + */ +export const createChecksumStream = ({ + expectedChecksum, + checksum, + source, + checksumSourceLocation, + base64Encoder, +}: ChecksumStreamInit): ReadableStreamType => { + if (!isReadableStream(source)) { + throw new Error( + `@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.` + ); + } + + const encoder = base64Encoder ?? toBase64; + + if (typeof TransformStream !== "function") { + throw new Error( + "@smithy/util-stream: unable to instantiate ChecksumStream because API unavailable: ReadableStream/TransformStream." + ); + } + + const transform = new TransformStream({ + start() {}, + async transform(chunk: any, controller: TransformStreamDefaultController) { + /** + * When the upstream source flows data to this stream, + * calculate a step update of the checksum. + */ + checksum.update(chunk); + controller.enqueue(chunk); + }, + async flush(controller: TransformStreamDefaultController) { + const digest: Uint8Array = await checksum.digest(); + const received = encoder(digest); + + if (expectedChecksum !== received) { + const error = new Error( + `Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` + + ` in response header "${checksumSourceLocation}".` + ); + controller.error(error); + } else { + controller.terminate(); + } + }, + }); + + source.pipeThrough(transform); + const readable = transform.readable; + Object.setPrototypeOf(readable, ChecksumStream.prototype); + return readable; +}; diff --git a/packages/util-stream/src/checksum/createChecksumStream.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.spec.ts new file mode 100644 index 00000000000..8abf019b592 --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.spec.ts @@ -0,0 +1,136 @@ +import { Checksum } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; +import { toUtf8 } from "@smithy/util-utf8"; +import { Readable } from "stream"; + +import { headStream } from "../headStream"; +import { ChecksumStream } from "./ChecksumStream"; +import { ChecksumStream as ChecksumStreamWeb } from "./ChecksumStream.browser"; +import { createChecksumStream } from "./createChecksumStream"; + +describe("Checksum streams", () => { + /** + * Hash "algorithm" that appends all data together. + */ + class Appender implements Checksum { + public hash = ""; + async digest(): Promise { + return Buffer.from(this.hash); + } + reset(): void { + throw new Error("Function not implemented."); + } + update(chunk: Uint8Array): void { + this.hash += toUtf8(chunk); + } + } + + const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0))); + + const canonicalUtf8 = toUtf8(canonicalData); + const canonicalBase64 = toBase64(canonicalUtf8); + + describe(createChecksumStream.name, () => { + const makeStream = () => { + return Readable.from(Buffer.from(canonicalData.buffer, 0, 26)); + }; + + it("should extend a Readable stream", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: canonicalBase64, + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + expect(checksumStream).toBeInstanceOf(Readable); + expect(checksumStream).toBeInstanceOf(ChecksumStream); + + const collected = toUtf8(await headStream(checksumStream, Infinity)); + expect(collected).toEqual(canonicalUtf8); + expect(stream.readableEnded).toEqual(true); + expect(checksumStream.readableEnded).toEqual(true); + }); + + it("should throw during stream read if the checksum does not match", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: "different-expected-checksum", + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + try { + toUtf8(await headStream(checksumStream, Infinity)); + throw new Error("stream was read successfully"); + } catch (e: unknown) { + expect(String(e)).toEqual( + `Error: Checksum mismatch: expected "different-expected-checksum" but` + + ` received "${canonicalBase64}"` + + ` in response header "my-header".` + ); + } + }); + }); + + describe(createChecksumStream.name + " webstreams API", () => { + if (typeof ReadableStream !== "function") { + // test not applicable to Node.js 16. + return; + } + + const makeStream = () => { + return new ReadableStream({ + start(controller) { + canonicalData.forEach((byte) => { + controller.enqueue(new Uint8Array([byte])); + }); + controller.close(); + }, + }); + }; + + it("should extend a ReadableStream", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: canonicalBase64, + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + expect(checksumStream).toBeInstanceOf(ReadableStream); + expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb); + + const collected = toUtf8(await headStream(checksumStream, Infinity)); + expect(collected).toEqual(canonicalUtf8); + expect(stream.locked).toEqual(true); + + // expectation is that it is resolved. + expect(await checksumStream.getReader().closed); + }); + + it("should throw during stream read if the checksum does not match", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: "different-expected-checksum", + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + try { + toUtf8(await headStream(checksumStream, Infinity)); + throw new Error("stream was read successfully"); + } catch (e: unknown) { + expect(String(e)).toEqual( + `Error: Checksum mismatch: expected "different-expected-checksum" but` + + ` received "${canonicalBase64}"` + + ` in response header "my-header".` + ); + } + }); + }); +}); diff --git a/packages/util-stream/src/checksum/createChecksumStream.ts b/packages/util-stream/src/checksum/createChecksumStream.ts new file mode 100644 index 00000000000..348c8e8b860 --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.ts @@ -0,0 +1,22 @@ +import { Readable } from "stream"; + +import { isReadableStream } from "../stream-type-check"; +import { ChecksumStream, ChecksumStreamInit } from "./ChecksumStream"; +import { createChecksumStream as createChecksumStreamWeb, ReadableStreamType } from "./createChecksumStream.browser"; + +/** + * @internal + * + * Creates a stream mirroring the input stream's interface, but + * performs checksumming when reading to the end of the stream. + */ +export function createChecksumStream(init: ChecksumStreamInit): ReadableStreamType; +export function createChecksumStream(init: ChecksumStreamInit): Readable; +export function createChecksumStream( + init: ChecksumStreamInit +): Readable | ReadableStreamType { + if (typeof ReadableStream === "function" && isReadableStream(init.source)) { + return createChecksumStreamWeb(init as ChecksumStreamInit); + } + return new ChecksumStream(init as ChecksumStreamInit); +} diff --git a/packages/util-stream/src/index.ts b/packages/util-stream/src/index.ts index 305896d2fa0..035d92f1e7e 100644 --- a/packages/util-stream/src/index.ts +++ b/packages/util-stream/src/index.ts @@ -4,3 +4,5 @@ export * from "./sdk-stream-mixin"; export * from "./splitStream"; export * from "./headStream"; export * from "./stream-type-check"; +export * from "./checksum/createChecksumStream"; +export * from "./checksum/ChecksumStream";