Streaming 工程深度
5-1 讲了流式 UX 设计,6-8 讲了 Voice。但文本流式是 AI 应用最常见的形态,单独需要工程深度。前端工程师每天都在做的事,但很少做对。
学前说明
AI 应用 90% 的体验差异来自 streaming 实现。同样的模型、同样的 Prompt:
- 做得好:用户感觉 AI "在思考",体验流畅
- 做得差:等 30 秒一次性出来,用户觉得卡死
但 streaming 不只是"逐字显示"。涉及:
- 协议选型(SSE / WebSocket / HTTP/2 push)
- 流式 Tool Use(边流边调工具)
- 中断、重试、断线重连
- 前端 hooks 设计
- 后端架构(Edge / Serverless / 长连接)
本篇讲清楚每一块。
学习目标
- 选对流式协议(SSE vs WebSocket)
- 实现流式 Tool Use(不只是文本,还有结构化数据)
- 用 Vercel AI SDK / 自建处理流式
- 设计 React/Vue 端的流式 hooks
- 处理断线重连、重试、用户中断
- 边缘部署(Cloudflare Workers / Vercel Edge)
与现有知识的衔接
- 5-1 AI 产品 UX:UX 视角(前置)
- 6-8 Voice Agent:实时音频流(相关但不同)
- 04 Lethal Trifecta:流式输出可能带 prompt injection 渲染
第一章:协议选型
1.1 三种主流协议
| 协议 | 方向 | 浏览器支持 | 开销 | 适合 |
|---|---|---|---|---|
| Server-Sent Events (SSE) | 单向(服务端 → 客户端) | 原生 EventSource | 低 | LLM 输出流(标准) |
| WebSocket | 双向 | 原生 WebSocket | 中 | 需要双向(聊天 + 用户打断) |
| HTTP Streaming | 单向 | fetch + ReadableStream | 低 | 简单流(OpenAI / Anthropic SDK 用) |
1.2 SSE:默认选择
为什么 SSE 是 LLM streaming 的事实标准:
- 基于 HTTP(防火墙、代理友好)
- 内置自动重连(
EventSource的 retry) - 文本协议(debug 简单)
- 单向足够(LLM 输出是单向的)
OpenAI、Anthropic、Google 的 streaming API 都用 SSE。
// 服务端
app.get('/api/chat/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await llm.stream(req.query.message);
for await (const chunk of stream) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
});
// 客户端
const eventSource = new EventSource('/api/chat/stream?message=hi');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const chunk = JSON.parse(event.data);
appendToUI(chunk);
};
1.3 WebSocket:需要双向时
什么时候必须 WebSocket:
- 用户实时打断(语音 Agent 必需)
- 服务端主动推送(多人协作、通知)
- 双向流(用户上传 + AI 输出同时)
// 客户端
const ws = new WebSocket('wss://api.example.com/chat');
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'token') appendToUI(message.text);
if (message.type === 'tool_call') showToolCall(message);
};
// 用户打断
function userInterrupt() {
ws.send(JSON.stringify({ type: 'interrupt' }));
}
代价:
- 长连接占资源(可观测性、负载均衡都更复杂)
- Edge 部署支持有限(Cloudflare Workers 支持 WebSocket,但有限制)
1.4 HTTP Streaming:简单但灵活
fetch + ReadableStream 也能做 streaming,且不需要 SSE 的特殊格式。
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ message: 'hi' })
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
appendToUI(text);
}
适合:
- 不需要事件区分(纯文本)
- 不需要自动重连
- 后端简单(直接 write)
OpenAI / Anthropic 的 SDK 内部用这种。
第二章:流式 Tool Use
文本 streaming 简单。带 Tool Use 的 streaming 复杂得多——因为 LLM 可能在流中突然决定调用工具,工具调用的参数也是流式的(一个 token 一个 token)。
2.1 Tool Use 流的事件类型
type StreamEvent =
| { type: 'message_start'; messageId: string }
| { type: 'content_block_start'; block: 'text' | 'tool_use' }
| { type: 'text_delta'; delta: string }
| { type: 'tool_use_start'; toolName: string; toolUseId: string }
| { type: 'tool_input_delta'; partialJson: string }
| { type: 'tool_use_end'; input: any }
| { type: 'content_block_end' }
| { type: 'message_stop'; stopReason: 'end_turn' | 'tool_use' | 'max_tokens' };
2.2 Anthropic 的 streaming API
const stream = await anthropic.messages.stream({
model: 'claude-sonnet-4-5',
messages: [{ role: 'user', content: 'What is the weather in Tokyo?' }],
tools: [...],
max_tokens: 1024,
});
for await (const event of stream) {
switch (event.type) {
case 'content_block_delta':
if (event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
if (event.delta.type === 'input_json_delta') {
// 工具参数流式输出
process.stdout.write(event.delta.partial_json);
}
break;
case 'content_block_stop':
// 一个内容块结束(可能是文本,可能是工具调用)
break;
}
}
2.3 处理流式 Tool 参数
工具参数是 JSON,但流式输出的 JSON 不完整:
First chunk: {"city
Next chunk: ":"To
Next chunk: kyo"}
不能 JSON.parse 中间状态。两种处理:
模式 A:等完整再 parse
let partialJson = '';
for await (const event of stream) {
if (event.type === 'tool_input_delta') {
partialJson += event.partial_json;
}
if (event.type === 'tool_use_end') {
const fullInput = JSON.parse(partialJson);
await executeTool(toolName, fullInput);
partialJson = '';
}
}
简单。但用户看不到工具调用的"过程"。
模式 B:流式 partial JSON parse
用 partial-json 这类库:
import { parse } from 'partial-json';
let partialJson = '';
for await (const event of stream) {
if (event.type === 'tool_input_delta') {
partialJson += event.partial_json;
// 尝试 partial parse(成功就是当前状态)
try {
const partial = parse(partialJson);
// 实时更新 UI:"正在查询天气,城市:东京..."
updateToolUI(partial);
} catch {
// 还不完整,等待
}
}
}
体验更好,但工程更复杂。Vercel AI SDK 内置这个。
2.4 Vercel AI SDK 的抽象
Vercel AI SDK 把这些细节都包了。
// 服务端(Next.js Route Handler)
import { streamText, tool } from 'ai';
export async function POST(req: Request) {
const { messages } = await req.json();
const result = await streamText({
model: anthropic('claude-sonnet-4-5'),
messages,
tools: {
getWeather: tool({
description: 'Get weather',
parameters: z.object({ city: z.string() }),
execute: async ({ city }) => fetchWeather(city),
}),
},
});
return result.toDataStreamResponse();
}
// 客户端(React)
import { useChat } from '@ai-sdk/react';
function Chat() {
const { messages, input, handleSubmit, handleInputChange } = useChat();
return (
<div>
{messages.map(m => (
<div key={m.id}>
{m.role}: {m.content}
{m.toolInvocations?.map(tool => (
<div>Calling {tool.toolName}({JSON.stringify(tool.args)})</div>
))}
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
</form>
</div>
);
}
useChat 内部处理:
- SSE 连接
- 流式 token 拼接
- 工具调用展示
- 自动重连
第三章:前端 Hooks 设计
不用 Vercel AI SDK 时,自己封装 hook。
3.1 useStreamingChat hook
import { useState, useRef, useCallback } from 'react';
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
isStreaming?: boolean;
}
export function useStreamingChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [isLoading, setIsLoading] = useState(false);
const abortRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(async (userInput: string) => {
setIsLoading(true);
// 添加用户消息
const userMsg: Message = {
id: crypto.randomUUID(),
role: 'user',
content: userInput
};
// 添加 AI 占位消息
const aiMsgId = crypto.randomUUID();
const aiMsg: Message = {
id: aiMsgId,
role: 'assistant',
content: '',
isStreaming: true,
};
setMessages(prev => [...prev, userMsg, aiMsg]);
// 准备 abort
abortRef.current = new AbortController();
try {
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ messages: [...messages, userMsg] }),
signal: abortRef.current.signal,
});
if (!response.body) throw new Error('No body');
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 解析 SSE
const lines = buffer.split('\n\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') continue;
const event = JSON.parse(data);
if (event.type === 'text') {
// 增量更新
setMessages(prev => prev.map(m =>
m.id === aiMsgId
? { ...m, content: m.content + event.text }
: m
));
}
}
}
// 标记完成
setMessages(prev => prev.map(m =>
m.id === aiMsgId ? { ...m, isStreaming: false } : m
));
} catch (error) {
if ((error as Error).name === 'AbortError') {
// 用户中断
setMessages(prev => prev.map(m =>
m.id === aiMsgId ? { ...m, isStreaming: false, content: m.content + ' [中断]' } : m
));
} else {
// 其他错误
console.error(error);
}
} finally {
setIsLoading(false);
abortRef.current = null;
}
}, [messages]);
const stop = useCallback(() => {
abortRef.current?.abort();
}, []);
return { messages, sendMessage, stop, isLoading };
}
3.2 处理工具调用展示
interface AssistantMessage {
id: string;
role: 'assistant';
content: string;
toolCalls?: Array<{
id: string;
name: string;
args: any; // 可能是 partial
status: 'pending' | 'executing' | 'done';
result?: any;
}>;
isStreaming: boolean;
}
// 在流式处理中
case 'tool_use_start':
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: [...(msg.toolCalls ?? []), {
id: event.toolUseId,
name: event.toolName,
args: {},
status: 'pending',
}]
})));
break;
case 'tool_input_delta':
// 用 partial-json 解析
const partial = parsePartialJson(buffer);
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: msg.toolCalls!.map(t =>
t.id === event.toolUseId ? { ...t, args: partial } : t
)
})));
break;
case 'tool_executing':
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: msg.toolCalls!.map(t =>
t.id === event.toolUseId ? { ...t, status: 'executing' } : t
)
})));
break;
case 'tool_result':
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: msg.toolCalls!.map(t =>
t.id === event.toolUseId ? { ...t, status: 'done', result: event.result } : t
)
})));
break;
3.3 渲染优化
流式渲染每秒可能 50+ 次 setState。React 默认会卡。
优化 1:批量更新
// 不要每个 token 都 setState
let pendingText = '';
let flushTimer: number | null = null;
function appendToken(token: string) {
pendingText += token;
if (!flushTimer) {
flushTimer = requestAnimationFrame(() => {
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
content: msg.content + pendingText
})));
pendingText = '';
flushTimer = null;
});
}
}
优化 2:避免父组件重渲染
// ❌ 父组件保存所有 messages 状态,每次更新触发全列表重渲染
function ChatList() {
const { messages } = useStreamingChat(); // 每个 token 触发更新
return messages.map(m => <Message {...m} />);
}
// ✅ 把流式状态抽到子组件
function ChatList() {
const messageIds = useMessageIds(); // 只订阅 id 列表
return messageIds.map(id => <MessageById id={id} />);
}
function MessageById({ id }: { id: string }) {
const message = useMessage(id); // 只订阅这条 message
// ...
}
优化 3:Markdown 流式渲染
// 流式 markdown 必须处理"半成品"
// 比如还没收完 ```typescript ... ```
import { Streamdown } from 'streamdown'; // 专门为流式设计
<Streamdown>{message.content}</Streamdown>
第四章:断线重连与重试
4.1 断线场景
- 用户切到其他网络(Wi-Fi → 4G)
- 服务端临时挂
- Cloudflare 等中间层断
- 浏览器前后台切换
4.2 EventSource 自带重连
const es = new EventSource('/api/chat');
// 默认连接失败会自动 retry
// 服务端可以发 retry: 5000 让客户端等 5 秒
但这是"断了从头连"。AI 流通常不能从头连——会重新生成不同的回复。
4.3 断点续传
需要服务端支持 resumable streaming:
// 服务端为每个流分配 ID 并暂存
const streams = new Map<string, StreamState>();
app.get('/api/chat/start', async (req, res) => {
const streamId = crypto.randomUUID();
const state = { tokens: [], finished: false };
streams.set(streamId, state);
// 异步生成
generateAndStore(streamId, req.query.message);
res.json({ streamId });
});
app.get('/api/chat/stream/:id', async (req, res) => {
const { id } = req.params;
const sinceToken = parseInt(req.query.since as string) || 0;
const state = streams.get(id);
// 发已有 tokens
for (let i = sinceToken; i < state.tokens.length; i++) {
res.write(`data: ${JSON.stringify({ index: i, ...state.tokens[i] })}\n\n`);
}
// 等新 tokens(如果还在生成)
// ... 用 EventEmitter / Redis pub-sub
});
// 客户端
function reconnect(streamId: string, lastTokenIndex: number) {
const es = new EventSource(`/api/chat/stream/${streamId}?since=${lastTokenIndex}`);
// ...
}
复杂但用户体验好。Vercel AI SDK v3 已经在做这个方向。
4.4 重试策略
如果断了不能续:
async function chatWithRetry(message: string) {
const MAX_RETRIES = 3;
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
return await chat(message);
} catch (error) {
if (isRetryable(error)) {
await sleep(1000 * 2 ** attempt); // 指数退避
continue;
}
throw error;
}
}
throw new Error('max_retries_exceeded');
}
function isRetryable(error: any): boolean {
if (error.code === 'ECONNRESET') return true;
if (error.code === 'ETIMEDOUT') return true;
if (error.status === 429) return true; // rate limit
if (error.status >= 500) return true;
return false;
}
第五章:用户中断
5.1 中断的两种含义
1. 取消生成:用户不想要这个回答
- 实现:客户端 abort fetch,服务端关闭 LLM stream
2. 打断并新输入:用户想换个问题
- 实现:先中断,再发新请求
5.2 客户端实现
const abortController = new AbortController();
const response = await fetch('/api/chat', {
signal: abortController.signal,
// ...
});
// 用户点"停止"
function userStop() {
abortController.abort();
}
5.3 服务端实现
app.post('/api/chat', async (req, res) => {
const stream = await llm.stream(req.body);
// 监听客户端断开
req.on('close', () => {
stream.controller?.abort(); // 关闭 LLM 调用
console.log('client disconnected, aborted llm');
});
for await (const chunk of stream) {
if (res.destroyed) break; // 已经断开
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.end();
});
关键:客户端断开后必须真的停止 LLM 调用,否则继续烧 token。
第六章:边缘部署
6.1 为什么 AI 应用适合 Edge
- 用户和 LLM API 之间加 Edge 层
- 减少延迟(Edge 节点离用户近)
- Streaming 在 Edge 自然支持
- Serverless 自动扩展
6.2 Vercel Edge Function
// app/api/chat/route.ts
export const runtime = 'edge';
export async function POST(req: Request) {
const { messages } = await req.json();
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages,
stream: true,
});
// 直接 pipe
return new Response(stream.toReadableStream(), {
headers: { 'Content-Type': 'text/event-stream' }
});
}
优点:
- 部署到全球 100+ 节点
- 冷启动 < 100ms
- 自动扩展
- 不收费 / 用量低成本
限制:
- 执行时间(Vercel 有 maxDuration)
- 只能用 Web 标准 API(不能用 Node-only 库)
- WebSocket 支持有限
6.3 Cloudflare Workers AI
更激进:连 LLM 都跑在 Edge。
export default {
async fetch(request: Request, env: Env) {
const { messages } = await request.json();
// 用 Cloudflare 的 Workers AI 跑模型
const response = await env.AI.run('@cf/meta/llama-3-8b-instruct', {
messages,
stream: true,
});
return new Response(response, {
headers: { 'Content-Type': 'text/event-stream' }
});
}
};
适合:用开源模型、对延迟极敏感、成本敏感。
6.4 Edge 的限制
不是所有场景都该 Edge:
- 需要数据库连接(需要 connection pooling)
- 需要 Node.js 特定 API(fs、child_process)
- 长任务(> 5 分钟)
- 需要 WebSocket(部分平台支持有限)
第七章:实战架构
7.1 简单聊天应用
浏览器 (React + AI SDK)
↓ POST /api/chat (SSE)
Vercel Edge Function
↓ Anthropic SDK
Anthropic API
特点:
- 全 SSE
- 无状态(每次请求带完整 history)
- Edge 部署
7.2 带工具调用的 Agent
浏览器
↓ SSE
Edge Function
↓ 调 LLM(流式)
LLM
↓ 决定调工具
Edge Function (执行工具)
↓ 把结果加进 messages,再调 LLM
LLM 继续生成
↓ 流回客户端
关键:在一个连接里完成多轮 LLM ↔ Tool 互动。Vercel AI SDK 内置这个 loop。
7.3 长任务(异步)
浏览器
↓ POST /api/task/start
Edge Function
↓ 入队(Redis / SQS)
Worker(长任务,跑几分钟到几小时)
↓ 进度更新到 KV
浏览器
← SSE /api/task/:id/stream
Worker(写完成结果到 KV)
适合:研究 Agent、批量处理、Codex Cloud 类。
7.4 多人协作场景
浏览器 A 浏览器 B
↓ WS ↓ WS
WebSocket Hub(Cloudflare Durable Object)
↓ 转发
LLM 调用 + 流式给所有连接的客户端
适合:协同编辑、群聊 Agent。
第八章:监控与调试
8.1 关键指标
| 指标 | 健康范围 |
|---|---|
| Time to first token (TTFT) | < 1s |
| Token rate | > 30 tokens/s |
| 总耗时 | < 30s(普通对话) |
| 断线率 | < 1% |
| 中断后续连成功率 | > 95%(如果实现了续连) |
8.2 调试工具
Chrome DevTools:
- Network 面板看
text/event-stream - 可以看每个 SSE 事件
Vercel AI SDK DevTools:
- 内置 debug overlay
- 看每个 stream event
- 看 tool 调用顺序
8.3 服务端日志
async function* loggedStream(stream: AsyncIterable<any>) {
let tokenCount = 0;
let firstTokenTime: number | null = null;
const start = Date.now();
for await (const chunk of stream) {
if (firstTokenTime === null) {
firstTokenTime = Date.now();
logger.info('TTFT', { ms: firstTokenTime - start });
}
tokenCount++;
yield chunk;
}
logger.info('stream complete', {
totalMs: Date.now() - start,
tokenCount,
rate: tokenCount / ((Date.now() - start) / 1000)
});
}
第九章:踩坑总结
9.1 常见反模式
| 反模式 | 后果 | 修正 |
|---|---|---|
| 不批量 setState | 50fps 卡顿 | requestAnimationFrame |
| 整个对话一个 React 组件 | 流式时全列表重渲染 | 拆子组件,按 id 订阅 |
| JSON.parse 流式 partial | 抛错 | 用 partial-json 库 |
| 客户端断了不取消 LLM | 烧 token | req.on('close') 取消 |
| 用 WebSocket 但不需要双向 | 复杂度高 | 用 SSE |
| 不处理 markdown 流式 | 渲染半成品代码块出错 | 用 Streamdown |
| Edge 用 Node-only 库 | 部署失败 | 看 SDK 是否支持 Edge runtime |
| 长任务用 Edge | 超时 | 用 Worker + 异步 |
9.2 性能调优清单
- TTFT < 1s(首 token 时间)
- Token rate > 30/s
- React 渲染 > 30 FPS
- 断线自动重试(至少 1 次)
- 用户中断生效 < 100ms
- 工具调用展示流式参数
- Markdown 渲染处理半成品
- 监控 TTFT、token rate、断线率
第十章:未来方向
10.1 Resumable Streaming
Vercel 在推 resumable-stream。让流可以断点续传,类似视频播放。
2026 年开始有标准化方案。
10.2 双向流 + Tool Use
OpenAI Realtime API 已经做了——文本 + 音频 + 工具调用全在一个 WebSocket。
下一步:标准化、跨厂商兼容。
10.3 Server Components Streaming
React Server Components + AI 流式:
// 服务端组件直接流式 LLM 输出
async function AIResponse({ message }: { message: string }) {
const stream = await llm.stream(message);
return <StreamingText stream={stream} />;
}
减少客户端 JS,更快首屏。
10.4 边缘原生 LLM
Cloudflare Workers AI、Vercel AI Gateway —— LLM 推理本身在 Edge。
挑战:模型 size 大,边缘节点存储有限。但 1B-3B 小模型已经可以全 Edge 跑。
权威资料
- Vercel AI SDK 文档
- OpenAI Streaming Best Practices
- Anthropic Messages Streaming
- Cloudflare Workers AI
- Server-Sent Events (MDN)
- partial-json-parser
- Streamdown
- 5-1 AI 产品 UX(前置)
- 6-8 Voice Agent 与实时 AI
核对日期:2026-06-12