Skip to content

Commit

Permalink
feat(util-stream): create checksum stream adapters (#1409)
Browse files Browse the repository at this point in the history
* feat(util-stream): create checksum stream adapters

* add bundler metadata

* move TransformStream checksum to flush event

* improve uniformity of node/web checksumstream api

* alphabetization

* use class inheritance

* inheritance issue in jest

* add karma test for checksum stream

* separate files
  • Loading branch information
kuhe authored Oct 17, 2024
1 parent 536fb7f commit f4e0bd9
Show file tree
Hide file tree
Showing 10 changed files with 505 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/red-cameras-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@smithy/util-stream": minor
---

create checksum stream adapter
10 changes: 9 additions & 1 deletion packages/util-stream/karma.conf.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
module.exports = function (config) {
config.set({
frameworks: ["jasmine", "karma-typescript"],
files: ["src/getAwsChunkedEncodingStream.browser.ts", "src/getAwsChunkedEncodingStream.browser.spec.ts"],
files: [
"src/checksum/createChecksumStream.browser.spec.ts",
"src/checksum/createChecksumStream.browser.ts",
"src/checksum/ChecksumStream.browser.ts",
"src/getAwsChunkedEncodingStream.browser.spec.ts",
"src/getAwsChunkedEncodingStream.browser.ts",
"src/headStream.browser.ts",
"src/stream-type-check.ts",
],
exclude: ["**/*.d.ts"],
preprocessors: {
"**/*.ts": "karma-typescript",
Expand Down
3 changes: 3 additions & 0 deletions packages/util-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@
"dist-*/**"
],
"browser": {
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser"
},
"react-native": {
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser",
"./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser",
"./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser",
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser",
"./dist-cjs/headStream": "./dist-cjs/headStream.browser",
Expand Down
39 changes: 39 additions & 0 deletions packages/util-stream/src/checksum/ChecksumStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Checksum, Encoder } from "@smithy/types";

/**
* @internal
*/
export interface ChecksumStreamInit {
/**
* Base64 value of the expected checksum.
*/
expectedChecksum: string;
/**
* For error messaging, the location from which the checksum value was read.
*/
checksumSourceLocation: string;
/**
* The checksum calculator.
*/
checksum: Checksum;
/**
* The stream to be checked.
*/
source: ReadableStream;

/**
* Optional base 64 encoder if calling from a request context.
*/
base64Encoder?: Encoder;
}

const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {};

/**
* This stub exists so that the readable returned by createChecksumStream
* identifies as "ChecksumStream" in alignment with the Node.js
* implementation.
*
* @extends ReadableStream
*/
export class ChecksumStream extends (ReadableStreamRef as any) {}
118 changes: 118 additions & 0 deletions packages/util-stream/src/checksum/ChecksumStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { Checksum, Encoder } from "@smithy/types";
import { toBase64 } from "@smithy/util-base64";
import { Duplex, Readable } from "stream";

/**
* @internal
*/
export interface ChecksumStreamInit<T extends Readable | ReadableStream> {
/**
* Base64 value of the expected checksum.
*/
expectedChecksum: string;
/**
* For error messaging, the location from which the checksum value was read.
*/
checksumSourceLocation: string;
/**
* The checksum calculator.
*/
checksum: Checksum;
/**
* The stream to be checked.
*/
source: T;

/**
* Optional base 64 encoder if calling from a request context.
*/
base64Encoder?: Encoder;
}

/**
* @internal
*
* Wrapper for throwing checksum errors for streams without
* buffering the stream.
*
*/
export class ChecksumStream extends Duplex {
private expectedChecksum: string;
private checksumSourceLocation: string;
private checksum: Checksum;
private source?: Readable;
private base64Encoder: Encoder;

public constructor({
expectedChecksum,
checksum,
source,
checksumSourceLocation,
base64Encoder,
}: ChecksumStreamInit<Readable>) {
super();
if (typeof (source as Readable).pipe === "function") {
this.source = source as Readable;
} else {
throw new Error(
`@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.`
);
}

this.base64Encoder = base64Encoder ?? toBase64;
this.expectedChecksum = expectedChecksum;
this.checksum = checksum;
this.checksumSourceLocation = checksumSourceLocation;

// connect this stream to the end of the source stream.
this.source.pipe(this);
}

/**
* @internal do not call this directly.
*/
public _read(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
size: number
): void {}

/**
* @internal do not call this directly.
*
* When the upstream source flows data to this stream,
* calculate a step update of the checksum.
*/
public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void {
try {
this.checksum.update(chunk);
this.push(chunk);
} catch (e: unknown) {
return callback(e as Error);
}
return callback();
}

/**
* @internal do not call this directly.
*
* When the upstream source finishes, perform the checksum comparison.
*/
public async _final(callback: (err?: Error) => void): Promise<void> {
try {
const digest: Uint8Array = await this.checksum.digest();
const received = this.base64Encoder(digest);
if (this.expectedChecksum !== received) {
return callback(
new Error(
`Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` +
` in response header "${this.checksumSourceLocation}".`
)
);
}
} catch (e: unknown) {
return callback(e as Error);
}
this.push(null);
return callback();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Checksum } from "@smithy/types";
import { toBase64 } from "@smithy/util-base64";
import { toUtf8 } from "@smithy/util-utf8";

import { headStream } from "../headStream.browser";
import { ChecksumStream as ChecksumStreamWeb } from "./ChecksumStream.browser";
import { createChecksumStream } from "./createChecksumStream.browser";

describe("Checksum streams", () => {
/**
* Hash "algorithm" that appends all data together.
*/
class Appender implements Checksum {
public hash = "";
async digest(): Promise<Uint8Array> {
return Buffer.from(this.hash);
}
reset(): void {
throw new Error("Function not implemented.");
}
update(chunk: Uint8Array): void {
this.hash += toUtf8(chunk);
}
}

const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0)));

const canonicalUtf8 = toUtf8(canonicalData);
const canonicalBase64 = toBase64(canonicalUtf8);

describe(createChecksumStream.name + " webstreams API", () => {
if (typeof ReadableStream !== "function") {
// test not applicable to Node.js 16.
return;
}

const makeStream = () => {
return new ReadableStream({
start(controller) {
canonicalData.forEach((byte) => {
controller.enqueue(new Uint8Array([byte]));
});
controller.close();
},
});
};

it("should extend a ReadableStream", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: canonicalBase64,
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

expect(checksumStream).toBeInstanceOf(ReadableStream);
expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb);

const collected = toUtf8(await headStream(checksumStream, Infinity));
expect(collected).toEqual(canonicalUtf8);
expect(stream.locked).toEqual(true);

// expectation is that it is resolved.
expect(await checksumStream.getReader().closed);
});

it("should throw during stream read if the checksum does not match", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: "different-expected-checksum",
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

try {
toUtf8(await headStream(checksumStream, Infinity));
throw new Error("stream was read successfully");
} catch (e: unknown) {
expect(String(e)).toEqual(
`Error: Checksum mismatch: expected "different-expected-checksum" but` +
` received "${canonicalBase64}"` +
` in response header "my-header".`
);
}
});
});
});
82 changes: 82 additions & 0 deletions packages/util-stream/src/checksum/createChecksumStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { toBase64 } from "@smithy/util-base64";

import { isReadableStream } from "../stream-type-check";
import { ChecksumStream, ChecksumStreamInit } from "./ChecksumStream.browser";

/**
* @internal
* Alias prevents compiler from turning
* ReadableStream into ReadableStream<any>, which is incompatible
* with the NodeJS.ReadableStream global type.
*/
export type ReadableStreamType = ReadableStream;

/**
* This is a local copy of
* https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController
* in case users do not have this type.
*/
interface TransformStreamDefaultController {
enqueue(chunk: any): void;
error(error: unknown): void;
terminate(): void;
}

/**
* @internal
*
* Creates a stream adapter for throwing checksum errors for streams without
* buffering the stream.
*/
export const createChecksumStream = ({
expectedChecksum,
checksum,
source,
checksumSourceLocation,
base64Encoder,
}: ChecksumStreamInit): ReadableStreamType => {
if (!isReadableStream(source)) {
throw new Error(
`@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.`
);
}

const encoder = base64Encoder ?? toBase64;

if (typeof TransformStream !== "function") {
throw new Error(
"@smithy/util-stream: unable to instantiate ChecksumStream because API unavailable: ReadableStream/TransformStream."
);
}

const transform = new TransformStream({
start() {},
async transform(chunk: any, controller: TransformStreamDefaultController) {
/**
* When the upstream source flows data to this stream,
* calculate a step update of the checksum.
*/
checksum.update(chunk);
controller.enqueue(chunk);
},
async flush(controller: TransformStreamDefaultController) {
const digest: Uint8Array = await checksum.digest();
const received = encoder(digest);

if (expectedChecksum !== received) {
const error = new Error(
`Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` +
` in response header "${checksumSourceLocation}".`
);
controller.error(error);
} else {
controller.terminate();
}
},
});

source.pipeThrough(transform);
const readable = transform.readable;
Object.setPrototypeOf(readable, ChecksumStream.prototype);
return readable;
};
Loading

0 comments on commit f4e0bd9

Please sign in to comment.