Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: XStream 返回流同时支持异步迭代 #319

Merged
merged 22 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions components/useXChat/demo/stream-cancel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## zh-CN

打断正在流式输出的内容。

## en-US

Interrupt the ongoing streaming output.
119 changes: 119 additions & 0 deletions components/useXChat/demo/stream-cancel.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { UserOutlined } from '@ant-design/icons';
import { Bubble, Sender, useXAgent, useXChat, XStream } from '@ant-design/x';
import { Flex, type GetProp } from 'antd';
import React, { useEffect, useRef } from 'react';

const roles: GetProp<typeof Bubble.List, 'roles'> = {
ai: {
placement: 'start',
avatar: { icon: <UserOutlined />, style: { background: '#fde3cf' } },
},
local: {
placement: 'end',
avatar: { icon: <UserOutlined />, style: { background: '#87d068' } },
},
};

const contentChunks = [
'He',
'llo',
', w',
'or',
'ld!',
' Ant',
' Design',
' X',
' is',
' the',
' best',
'!',
];

function mockReadableStream() {
const sseChunks: string[] = [];

for (let i = 0; i < contentChunks.length; i++) {
const sseEventPart = `event: message\ndata: {"id":"${i}","content":"${contentChunks[i]}"}\n\n`;
sseChunks.push(sseEventPart);
}

return new ReadableStream({
async start(controller) {
for (const chunk of sseChunks) {
await new Promise((resolve) => setTimeout(resolve, 300));
controller.enqueue(new TextEncoder().encode(chunk));
}
controller.close();
},
});
}
YumoImer marked this conversation as resolved.
Show resolved Hide resolved

const App = () => {
const [content, setContent] = React.useState('');

const abortRef = useRef(() => {});

useEffect(() => {
return () => {
abortRef.current();
};
}, []);

// Agent for request
const [agent] = useXAgent({
request: async ({}, { onSuccess, onUpdate }) => {
YumoImer marked this conversation as resolved.
Show resolved Hide resolved
const stream = XStream({
readableStream: mockReadableStream(),
});

const reader = stream.getReader();
abortRef.current = () => {
reader?.cancel();
};
ppbl marked this conversation as resolved.
Show resolved Hide resolved

let current = '';
while (reader) {
const { value, done } = await reader.read();
if (done) {
onSuccess(current);
break;
}
if (!value) continue;
const data = JSON.parse(value.data);
current += data.content || '';
onUpdate(current);
}
YumoImer marked this conversation as resolved.
Show resolved Hide resolved
},
});

// Chat messages
const { onRequest, messages } = useXChat({
agent,
});

return (
<Flex vertical gap="middle">
<Bubble.List
roles={roles}
style={{ maxHeight: 300 }}
items={messages.map(({ id, message, status }) => ({
key: id,
role: status === 'local' ? 'local' : 'ai',
content: message,
}))}
/>
<Sender
loading={agent.isRequesting()}
value={content}
onChange={setContent}
onSubmit={(nextContent) => {
onRequest(nextContent);
setContent('');
}}
onCancel={() => abortRef.current()}
/>
</Flex>
);
};

export default App;
1 change: 1 addition & 0 deletions components/useXChat/index.en-US.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Use Agent to manage conversation data and produce data for page rendering.
<!-- prettier-ignore -->
<code src="./demo/basic.tsx">Basic</code>
<code src="./demo/stream.tsx">Streaming</code>
<code src="./demo/stream-cancel.tsx">Interrupt the output</code>
<code src="./demo/suggestions.tsx">Multiple Suggestion</code>

## API
Expand Down
1 change: 1 addition & 0 deletions components/useXChat/index.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ demo:
<!-- prettier-ignore -->
<code src="./demo/basic.tsx">基本</code>
<code src="./demo/stream.tsx">流式输出</code>
<code src="./demo/stream-cancel.tsx">打断输出</code>
<code src="./demo/suggestions.tsx">多项建议</code>

## API
Expand Down
13 changes: 13 additions & 0 deletions components/x-stream/__tests__/index.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,17 @@ describe('XStream', () => {
})(),
).rejects.toThrow('The key-value separator ":" is not found in the sse line chunk!');
});

it('should return an instance of ReadableStream', () => {
expect(
XStream({
readableStream: new ReadableStream({
async start(controller) {
controller.enqueue(new TextEncoder().encode('event: message\n\ndata: value\n\n'));
controller.close();
},
}),
}),
).toBeInstanceOf(ReadableStream);
});
});
61 changes: 34 additions & 27 deletions components/x-stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ export interface XStreamOptions<Output> {
transformStream?: TransformStream<string, Output>;
}

type XReadableStream<R = SSEOutput> = ReadableStream<R> & AsyncGenerator<R>;

/**
* @description Transform Uint8Array binary stream to {@link SSEOutput} by default
* @warning The `XStream` only support the `utf-8` encoding. More encoding support maybe in the future.
*/
async function* XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
function XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
const { readableStream, transformStream } = options;

if (!(readableStream instanceof ReadableStream)) {
Expand All @@ -151,33 +153,38 @@ async function* XStream<Output = SSEOutput>(options: XStreamOptions<Output>) {
// Default encoding is `utf-8`
const decoderStream = new TextDecoderStream();

const stream = transformStream
? /**
* Uint8Array binary -> string -> Output
*/
readableStream
.pipeThrough(decoderStream)
.pipeThrough(transformStream)
: /**
* Uint8Array binary -> string -> SSE part string -> Default Output {@link SSEOutput}
*/
readableStream
.pipeThrough(decoderStream)
.pipeThrough(splitStream())
.pipeThrough(splitPart());

const reader = stream.getReader() as ReadableStreamDefaultReader<Output>;

while (reader instanceof ReadableStreamDefaultReader) {
const { value, done } = await reader.read();

if (done) break;

if (!value) continue;

// Transformed data through all transform pipes
yield value;
const stream = (
transformStream
? /**
* Uint8Array binary -> string -> Output
*/
readableStream.pipeThrough(decoderStream).pipeThrough(transformStream)
: /**
* Uint8Array binary -> string -> SSE part string -> Default Output {@link SSEOutput}
*/
readableStream
.pipeThrough(decoderStream)
.pipeThrough(splitStream())
.pipeThrough(splitPart())
) as XReadableStream<Output>;

/** support async iterator */
stream[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();

while (true) {
const { done, value } = await reader.read();

if (done) break;

if (!value) continue;

// Transformed data through all transform pipes
yield value;
}
ppbl marked this conversation as resolved.
Show resolved Hide resolved
}

return stream;
}

export default XStream;
Loading