Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: XStream 返回流同时支持异步迭代 #319

Merged
merged 22 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions components/x-stream/__tests__/index.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,17 @@ describe('XStream', () => {
})(),
).rejects.toThrow('The key-value separator ":" is not found in the sse line chunk!');
});

it('should return an instance of ReadableStream', () => {
expect(
XStream({
readableStream: new ReadableStream({
async start(controller) {
controller.enqueue(new TextEncoder().encode('event: message\n\ndata: value\n\n'));
controller.close();
},
}),
}),
).toBeInstanceOf(ReadableStream);
});
});
61 changes: 34 additions & 27 deletions components/x-stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ export interface XStreamOptions<Output> {
transformStream?: TransformStream<string, Output>;
}

type XReadableStream<R = SSEOutput> = ReadableStream<R> & AsyncGenerator<R>;

/**
* @description Transform Uint8Array binary stream to {@link SSEOutput} by default
* @warning The `XStream` only support the `utf-8` encoding. More encoding support maybe in the future.
*/
async function* XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
function XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
const { readableStream, transformStream } = options;

if (!(readableStream instanceof ReadableStream)) {
Expand All @@ -151,33 +153,38 @@ async function* XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
// Default encoding is `utf-8`
const decoderStream = new TextDecoderStream();

const stream = transformStream
? /**
* Uint8Array binary -> string -> Output
*/
readableStream
.pipeThrough(decoderStream)
.pipeThrough(transformStream)
: /**
* Uint8Array binary -> string -> SSE part string -> Default Output {@link SSEOutput}
*/
readableStream
.pipeThrough(decoderStream)
.pipeThrough(splitStream())
.pipeThrough(splitPart());

const reader = stream.getReader() as ReadableStreamDefaultReader<Output>;

while (reader instanceof ReadableStreamDefaultReader) {
const { value, done } = await reader.read();

if (done) break;

if (!value) continue;

// Transformed data through all transform pipes
yield value;
const stream = (
transformStream
? /**
* Uint8Array binary -> string -> Output
*/
readableStream.pipeThrough(decoderStream).pipeThrough(transformStream)
: /**
* Uint8Array binary -> string -> SSE part string -> Default Output {@link SSEOutput}
*/
readableStream
.pipeThrough(decoderStream)
.pipeThrough(splitStream())
.pipeThrough(splitPart())
) as XReadableStream<Output>;

/** support async iterator */
stream[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();

while (true) {
const { done, value } = await reader.read();

if (done) break;

if (!value) continue;

// Transformed data through all transform pipes
yield value;
}
ppbl marked this conversation as resolved.
Show resolved Hide resolved
}

return stream;
}

export default XStream;
Loading