前文:手把手教你在浏览器和RUST中处理流式传输 提到如何简单的处理流式输出,但是后来发现这个写法有bug,下面讲解一下更好的写法
顺便补充一下,上一篇文章提到的IterableReadableStream
来自@langchain/core
,你可以这样导入使用:
1
| import { IterableReadableStream } from '@langchain/core/utils/stream'
|
处理Event Stream
除了上一章的ndjson以外,最常用就是Event Stream了,包括OpenAi等一众ai服务提供商都会提供sse接口,并且以Event Stream的格式进行输出,先来看看ai是怎么理解Event Stream和SSE的:
Server-Sent Events (SSE) ,一种基于 HTTP 的轻量协议,允许服务器向客户端推送实时数据流。
SSE 格式规范:
示例:
1 2 3 4 5 6 7
| event: status_update data: {"user": "Alice", "status": "online"}
id: 12345 data: This is a message.
retry: 3000
|
那再来看看ai输出的结果:
![image.png]()
很标准的text/event-stream
格式
使用langchainjs处理
你以为我要像上一篇一样开始手搓处理代码了吗,no no no,我们还是使用langchainjs进行处理,原因后面会提到。
这里推荐一个fetch封装工具:ofetch
,一个类似axios
的库,作用大家应该都懂了吧,这里我拿火山的接口来演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| export default defineConfig({ base: "/", server: { proxy: { "/huoshan": { changeOrigin: true, ws: true, secure: false, target: "https://ark.cn-beijing.volces.com", rewrite: (path) => path.replace(/^\/huoshan/, ""), }, }, }, });
module.export = { devServer: { compress: false, proxy: { '/huoshan': { target: 'https://ark.cn-beijing.volces.com', changeOrigin: true, ws: true, secure: false, pathRewrite: { '^/huoshan': '', }, }, } } }
|
如果是webpack的话,一定要关闭devServer
的compress
,不然会导致整个请求结束才返回,这样就不是流式输出了。
1 2 3 4 5 6 7 8 9 10 11
|
import { ofetch } from "ofetch";
export const fetchRequest = ofetch.create({ baseURL: '/huoshan', timeout: 60000, onRequest({ options }) { options.headers.set('Authorization', 'Bearer xxxxx') }, })
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import { fetchRequest } from "./request"; import { convertEventStreamToIterableReadableDataStream } from "@langchain/core/utils/event_source_parse";
async function test() { const res = await fetchRequest("/api/v3/chat/completions", { responseType: "stream", method: "post", body: { model: "deepseek-v3-250324", messages: [ { role: "user", content: "你是谁?", }, ], stream: true, }, }); const stream = convertEventStreamToIterableReadableDataStream(res); for await (const chunk of stream) { console.log(chunk); } } test()
|
![image.png]()
返回正常,不过要注意,结尾有个[DONE]
,所以不能无脑反序列化,
1 2 3 4 5
| for await (const chunk of stream) { if (chunk !== '[DONE]') { console.log(JSON.parse(chunk)) } }
|
这样就拿到每个chunk了,当然你可以将test方法改成生成器,然后for
里面yield JSON.parse(chunk)
为什么要用langchainjs封装好的方法处理
既然大家都知道流式输出是一个一个chunk的方式返回,那么是不是有可能一行的文本,拆分成两个chunk(在js看来是ArrayBuffer)?而一个utf8字符是定长的,可能是1-3字节,那是不是有可能在某个字符的时候,其中一部分字节拆分到一个chunk,然后剩下部分字节拆分到下一个chunk?
这样就会导致你在decode的时候发生报错,无法正常decode成文字,所以langchainjs的方法考虑到这个情况:
![image.png]()
代码在:https://github.com/langchain-ai/langchainjs/blob/5100a9d0a1eda7b7998dd40624abdd3ff3002b36/langchain-core/src/utils/event_source_parse.ts#L88-L165
其他关注点
使用代理时需要注意
上面的webpack配置已经讲解了一下devServer应该怎么配置才能流式输出。还有就是使用nginx代理的时候也需要修改一下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| server { listen 80; location /huoshan/ { # http1.1才支持长连接 proxy_http_version 1.1; # 关闭代理缓冲 proxy_buffering off; # 设置代理缓冲区大小 proxy_buffer_size 10k; # 设置代理缓冲区数量和大小 proxy_buffers 4 10k; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_pass https://ark.cn-beijing.volces.com/; }
}
|
其实就是关闭一些代理缓冲,以及设置一下缓冲区,为什么要这样设置,这里有请懂nginx配置的大佬细说一下😜