跳到主要内容

Streaming 工程深度

5-1 讲了流式 UX 设计,6-8 讲了 Voice。但文本流式是 AI 应用最常见的形态,单独需要工程深度。前端工程师每天都在做的事,但很少做对。

学前说明

AI 应用 90% 的体验差异来自 streaming 实现。同样的模型、同样的 Prompt:

  • 做得好:用户感觉 AI "在思考",体验流畅
  • 做得差:等 30 秒一次性出来,用户觉得卡死

但 streaming 不只是"逐字显示"。涉及:

  • 协议选型(SSE / WebSocket / HTTP/2 push)
  • 流式 Tool Use(边流边调工具)
  • 中断、重试、断线重连
  • 前端 hooks 设计
  • 后端架构(Edge / Serverless / 长连接)

本篇讲清楚每一块。

学习目标

  • 选对流式协议(SSE vs WebSocket)
  • 实现流式 Tool Use(不只是文本,还有结构化数据)
  • 用 Vercel AI SDK / 自建处理流式
  • 设计 React/Vue 端的流式 hooks
  • 处理断线重连、重试、用户中断
  • 边缘部署(Cloudflare Workers / Vercel Edge)

与现有知识的衔接

  • 5-1 AI 产品 UX:UX 视角(前置)
  • 6-8 Voice Agent:实时音频流(相关但不同)
  • 04 Lethal Trifecta:流式输出可能带 prompt injection 渲染

第一章:协议选型

1.1 三种主流协议

协议方向浏览器支持开销适合
Server-Sent Events (SSE)单向(服务端 → 客户端)原生 EventSourceLLM 输出流(标准)
WebSocket双向原生 WebSocket需要双向(聊天 + 用户打断)
HTTP Streaming单向fetch + ReadableStream简单流(OpenAI / Anthropic SDK 用)

1.2 SSE:默认选择

为什么 SSE 是 LLM streaming 的事实标准:

  • 基于 HTTP(防火墙、代理友好)
  • 内置自动重连(EventSource 的 retry)
  • 文本协议(debug 简单)
  • 单向足够(LLM 输出是单向的)

OpenAI、Anthropic、Google 的 streaming API 都用 SSE。

// 服务端
app.get('/api/chat/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

const stream = await llm.stream(req.query.message);

for await (const chunk of stream) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}

res.write('data: [DONE]\n\n');
res.end();
});

// 客户端
const eventSource = new EventSource('/api/chat/stream?message=hi');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const chunk = JSON.parse(event.data);
appendToUI(chunk);
};

1.3 WebSocket:需要双向时

什么时候必须 WebSocket:

  • 用户实时打断(语音 Agent 必需)
  • 服务端主动推送(多人协作、通知)
  • 双向流(用户上传 + AI 输出同时)
// 客户端
const ws = new WebSocket('wss://api.example.com/chat');

ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'token') appendToUI(message.text);
if (message.type === 'tool_call') showToolCall(message);
};

// 用户打断
function userInterrupt() {
ws.send(JSON.stringify({ type: 'interrupt' }));
}

代价:

  • 长连接占资源(可观测性、负载均衡都更复杂)
  • Edge 部署支持有限(Cloudflare Workers 支持 WebSocket,但有限制)

1.4 HTTP Streaming:简单但灵活

fetch + ReadableStream 也能做 streaming,且不需要 SSE 的特殊格式。

const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ message: 'hi' })
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
appendToUI(text);
}

适合:

  • 不需要事件区分(纯文本)
  • 不需要自动重连
  • 后端简单(直接 write)

OpenAI / Anthropic 的 SDK 内部用这种。


第二章:流式 Tool Use

文本 streaming 简单。带 Tool Use 的 streaming 复杂得多——因为 LLM 可能在流中突然决定调用工具,工具调用的参数也是流式的(一个 token 一个 token)。

2.1 Tool Use 流的事件类型

type StreamEvent =
| { type: 'message_start'; messageId: string }
| { type: 'content_block_start'; block: 'text' | 'tool_use' }
| { type: 'text_delta'; delta: string }
| { type: 'tool_use_start'; toolName: string; toolUseId: string }
| { type: 'tool_input_delta'; partialJson: string }
| { type: 'tool_use_end'; input: any }
| { type: 'content_block_end' }
| { type: 'message_stop'; stopReason: 'end_turn' | 'tool_use' | 'max_tokens' };

2.2 Anthropic 的 streaming API

const stream = await anthropic.messages.stream({
model: 'claude-sonnet-4-5',
messages: [{ role: 'user', content: 'What is the weather in Tokyo?' }],
tools: [...],
max_tokens: 1024,
});

for await (const event of stream) {
switch (event.type) {
case 'content_block_delta':
if (event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
if (event.delta.type === 'input_json_delta') {
// 工具参数流式输出
process.stdout.write(event.delta.partial_json);
}
break;

case 'content_block_stop':
// 一个内容块结束(可能是文本,可能是工具调用)
break;
}
}

2.3 处理流式 Tool 参数

工具参数是 JSON,但流式输出的 JSON 不完整:

First chunk: {"city
Next chunk: ":"To
Next chunk: kyo"}

不能 JSON.parse 中间状态。两种处理:

模式 A:等完整再 parse

let partialJson = '';
for await (const event of stream) {
if (event.type === 'tool_input_delta') {
partialJson += event.partial_json;
}
if (event.type === 'tool_use_end') {
const fullInput = JSON.parse(partialJson);
await executeTool(toolName, fullInput);
partialJson = '';
}
}

简单。但用户看不到工具调用的"过程"。

模式 B:流式 partial JSON parse

partial-json 这类库:

import { parse } from 'partial-json';

let partialJson = '';
for await (const event of stream) {
if (event.type === 'tool_input_delta') {
partialJson += event.partial_json;

// 尝试 partial parse(成功就是当前状态)
try {
const partial = parse(partialJson);
// 实时更新 UI:"正在查询天气,城市:东京..."
updateToolUI(partial);
} catch {
// 还不完整,等待
}
}
}

体验更好,但工程更复杂。Vercel AI SDK 内置这个。

2.4 Vercel AI SDK 的抽象

Vercel AI SDK 把这些细节都包了。

// 服务端(Next.js Route Handler)
import { streamText, tool } from 'ai';

export async function POST(req: Request) {
const { messages } = await req.json();

const result = await streamText({
model: anthropic('claude-sonnet-4-5'),
messages,
tools: {
getWeather: tool({
description: 'Get weather',
parameters: z.object({ city: z.string() }),
execute: async ({ city }) => fetchWeather(city),
}),
},
});

return result.toDataStreamResponse();
}

// 客户端(React)
import { useChat } from '@ai-sdk/react';

function Chat() {
const { messages, input, handleSubmit, handleInputChange } = useChat();

return (
<div>
{messages.map(m => (
<div key={m.id}>
{m.role}: {m.content}
{m.toolInvocations?.map(tool => (
<div>Calling {tool.toolName}({JSON.stringify(tool.args)})</div>
))}
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
</form>
</div>
);
}

useChat 内部处理:

  • SSE 连接
  • 流式 token 拼接
  • 工具调用展示
  • 自动重连

第三章:前端 Hooks 设计

不用 Vercel AI SDK 时,自己封装 hook。

3.1 useStreamingChat hook

import { useState, useRef, useCallback } from 'react';

interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
isStreaming?: boolean;
}

export function useStreamingChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [isLoading, setIsLoading] = useState(false);
const abortRef = useRef<AbortController | null>(null);

const sendMessage = useCallback(async (userInput: string) => {
setIsLoading(true);

// 添加用户消息
const userMsg: Message = {
id: crypto.randomUUID(),
role: 'user',
content: userInput
};

// 添加 AI 占位消息
const aiMsgId = crypto.randomUUID();
const aiMsg: Message = {
id: aiMsgId,
role: 'assistant',
content: '',
isStreaming: true,
};

setMessages(prev => [...prev, userMsg, aiMsg]);

// 准备 abort
abortRef.current = new AbortController();

try {
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ messages: [...messages, userMsg] }),
signal: abortRef.current.signal,
});

if (!response.body) throw new Error('No body');

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });

// 解析 SSE
const lines = buffer.split('\n\n');
buffer = lines.pop() ?? '';

for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') continue;

const event = JSON.parse(data);
if (event.type === 'text') {
// 增量更新
setMessages(prev => prev.map(m =>
m.id === aiMsgId
? { ...m, content: m.content + event.text }
: m
));
}
}
}

// 标记完成
setMessages(prev => prev.map(m =>
m.id === aiMsgId ? { ...m, isStreaming: false } : m
));
} catch (error) {
if ((error as Error).name === 'AbortError') {
// 用户中断
setMessages(prev => prev.map(m =>
m.id === aiMsgId ? { ...m, isStreaming: false, content: m.content + ' [中断]' } : m
));
} else {
// 其他错误
console.error(error);
}
} finally {
setIsLoading(false);
abortRef.current = null;
}
}, [messages]);

const stop = useCallback(() => {
abortRef.current?.abort();
}, []);

return { messages, sendMessage, stop, isLoading };
}

3.2 处理工具调用展示

interface AssistantMessage {
id: string;
role: 'assistant';
content: string;
toolCalls?: Array<{
id: string;
name: string;
args: any; // 可能是 partial
status: 'pending' | 'executing' | 'done';
result?: any;
}>;
isStreaming: boolean;
}

// 在流式处理中
case 'tool_use_start':
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: [...(msg.toolCalls ?? []), {
id: event.toolUseId,
name: event.toolName,
args: {},
status: 'pending',
}]
})));
break;

case 'tool_input_delta':
// 用 partial-json 解析
const partial = parsePartialJson(buffer);
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: msg.toolCalls!.map(t =>
t.id === event.toolUseId ? { ...t, args: partial } : t
)
})));
break;

case 'tool_executing':
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: msg.toolCalls!.map(t =>
t.id === event.toolUseId ? { ...t, status: 'executing' } : t
)
})));
break;

case 'tool_result':
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
toolCalls: msg.toolCalls!.map(t =>
t.id === event.toolUseId ? { ...t, status: 'done', result: event.result } : t
)
})));
break;

3.3 渲染优化

流式渲染每秒可能 50+ 次 setState。React 默认会卡。

优化 1:批量更新

// 不要每个 token 都 setState
let pendingText = '';
let flushTimer: number | null = null;

function appendToken(token: string) {
pendingText += token;

if (!flushTimer) {
flushTimer = requestAnimationFrame(() => {
setMessages(prev => updateAiMsg(prev, msg => ({
...msg,
content: msg.content + pendingText
})));
pendingText = '';
flushTimer = null;
});
}
}

优化 2:避免父组件重渲染

// ❌ 父组件保存所有 messages 状态,每次更新触发全列表重渲染
function ChatList() {
const { messages } = useStreamingChat(); // 每个 token 触发更新
return messages.map(m => <Message {...m} />);
}

// ✅ 把流式状态抽到子组件
function ChatList() {
const messageIds = useMessageIds(); // 只订阅 id 列表
return messageIds.map(id => <MessageById id={id} />);
}

function MessageById({ id }: { id: string }) {
const message = useMessage(id); // 只订阅这条 message
// ...
}

优化 3:Markdown 流式渲染

// 流式 markdown 必须处理"半成品"
// 比如还没收完 ```typescript ... ```
import { Streamdown } from 'streamdown'; // 专门为流式设计

<Streamdown>{message.content}</Streamdown>

第四章:断线重连与重试

4.1 断线场景

  • 用户切到其他网络(Wi-Fi → 4G)
  • 服务端临时挂
  • Cloudflare 等中间层断
  • 浏览器前后台切换

4.2 EventSource 自带重连

const es = new EventSource('/api/chat');
// 默认连接失败会自动 retry
// 服务端可以发 retry: 5000 让客户端等 5 秒

但这是"断了从头连"。AI 流通常不能从头连——会重新生成不同的回复。

4.3 断点续传

需要服务端支持 resumable streaming:

// 服务端为每个流分配 ID 并暂存
const streams = new Map<string, StreamState>();

app.get('/api/chat/start', async (req, res) => {
const streamId = crypto.randomUUID();
const state = { tokens: [], finished: false };
streams.set(streamId, state);

// 异步生成
generateAndStore(streamId, req.query.message);

res.json({ streamId });
});

app.get('/api/chat/stream/:id', async (req, res) => {
const { id } = req.params;
const sinceToken = parseInt(req.query.since as string) || 0;
const state = streams.get(id);

// 发已有 tokens
for (let i = sinceToken; i < state.tokens.length; i++) {
res.write(`data: ${JSON.stringify({ index: i, ...state.tokens[i] })}\n\n`);
}

// 等新 tokens(如果还在生成)
// ... 用 EventEmitter / Redis pub-sub
});

// 客户端
function reconnect(streamId: string, lastTokenIndex: number) {
const es = new EventSource(`/api/chat/stream/${streamId}?since=${lastTokenIndex}`);
// ...
}

复杂但用户体验好。Vercel AI SDK v3 已经在做这个方向。

4.4 重试策略

如果断了不能续:

async function chatWithRetry(message: string) {
const MAX_RETRIES = 3;

for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
return await chat(message);
} catch (error) {
if (isRetryable(error)) {
await sleep(1000 * 2 ** attempt); // 指数退避
continue;
}
throw error;
}
}

throw new Error('max_retries_exceeded');
}

function isRetryable(error: any): boolean {
if (error.code === 'ECONNRESET') return true;
if (error.code === 'ETIMEDOUT') return true;
if (error.status === 429) return true; // rate limit
if (error.status >= 500) return true;
return false;
}

第五章:用户中断

5.1 中断的两种含义

1. 取消生成:用户不想要这个回答

  • 实现:客户端 abort fetch,服务端关闭 LLM stream

2. 打断并新输入:用户想换个问题

  • 实现:先中断,再发新请求

5.2 客户端实现

const abortController = new AbortController();

const response = await fetch('/api/chat', {
signal: abortController.signal,
// ...
});

// 用户点"停止"
function userStop() {
abortController.abort();
}

5.3 服务端实现

app.post('/api/chat', async (req, res) => {
const stream = await llm.stream(req.body);

// 监听客户端断开
req.on('close', () => {
stream.controller?.abort(); // 关闭 LLM 调用
console.log('client disconnected, aborted llm');
});

for await (const chunk of stream) {
if (res.destroyed) break; // 已经断开
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}

res.end();
});

关键:客户端断开后必须真的停止 LLM 调用,否则继续烧 token。


第六章:边缘部署

6.1 为什么 AI 应用适合 Edge

  • 用户和 LLM API 之间加 Edge 层
  • 减少延迟(Edge 节点离用户近)
  • Streaming 在 Edge 自然支持
  • Serverless 自动扩展

6.2 Vercel Edge Function

// app/api/chat/route.ts
export const runtime = 'edge';

export async function POST(req: Request) {
const { messages } = await req.json();

const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages,
stream: true,
});

// 直接 pipe
return new Response(stream.toReadableStream(), {
headers: { 'Content-Type': 'text/event-stream' }
});
}

优点:

  • 部署到全球 100+ 节点
  • 冷启动 < 100ms
  • 自动扩展
  • 不收费 / 用量低成本

限制:

  • 执行时间(Vercel 有 maxDuration)
  • 只能用 Web 标准 API(不能用 Node-only 库)
  • WebSocket 支持有限

6.3 Cloudflare Workers AI

更激进:连 LLM 都跑在 Edge。

export default {
async fetch(request: Request, env: Env) {
const { messages } = await request.json();

// 用 Cloudflare 的 Workers AI 跑模型
const response = await env.AI.run('@cf/meta/llama-3-8b-instruct', {
messages,
stream: true,
});

return new Response(response, {
headers: { 'Content-Type': 'text/event-stream' }
});
}
};

适合:用开源模型、对延迟极敏感、成本敏感。

6.4 Edge 的限制

不是所有场景都该 Edge:

  • 需要数据库连接(需要 connection pooling)
  • 需要 Node.js 特定 API(fs、child_process)
  • 长任务(> 5 分钟)
  • 需要 WebSocket(部分平台支持有限)

第七章:实战架构

7.1 简单聊天应用

浏览器 (React + AI SDK)
↓ POST /api/chat (SSE)
Vercel Edge Function
↓ Anthropic SDK
Anthropic API

特点:

  • 全 SSE
  • 无状态(每次请求带完整 history)
  • Edge 部署

7.2 带工具调用的 Agent

浏览器
↓ SSE
Edge Function
↓ 调 LLM(流式)
LLM
↓ 决定调工具
Edge Function (执行工具)
↓ 把结果加进 messages,再调 LLM
LLM 继续生成
↓ 流回客户端

关键:在一个连接里完成多轮 LLM ↔ Tool 互动。Vercel AI SDK 内置这个 loop。

7.3 长任务(异步)

浏览器
↓ POST /api/task/start
Edge Function
↓ 入队(Redis / SQS)
Worker(长任务,跑几分钟到几小时)
↓ 进度更新到 KV
浏览器
← SSE /api/task/:id/stream
Worker(写完成结果到 KV)

适合:研究 Agent、批量处理、Codex Cloud 类。

7.4 多人协作场景

浏览器 A 浏览器 B
↓ WS ↓ WS
WebSocket Hub(Cloudflare Durable Object)
↓ 转发
LLM 调用 + 流式给所有连接的客户端

适合:协同编辑、群聊 Agent。


第八章:监控与调试

8.1 关键指标

指标健康范围
Time to first token (TTFT)< 1s
Token rate> 30 tokens/s
总耗时< 30s(普通对话)
断线率< 1%
中断后续连成功率> 95%(如果实现了续连)

8.2 调试工具

Chrome DevTools

  • Network 面板看 text/event-stream
  • 可以看每个 SSE 事件

Vercel AI SDK DevTools

  • 内置 debug overlay
  • 看每个 stream event
  • 看 tool 调用顺序

8.3 服务端日志

async function* loggedStream(stream: AsyncIterable<any>) {
let tokenCount = 0;
let firstTokenTime: number | null = null;
const start = Date.now();

for await (const chunk of stream) {
if (firstTokenTime === null) {
firstTokenTime = Date.now();
logger.info('TTFT', { ms: firstTokenTime - start });
}
tokenCount++;
yield chunk;
}

logger.info('stream complete', {
totalMs: Date.now() - start,
tokenCount,
rate: tokenCount / ((Date.now() - start) / 1000)
});
}

第九章:踩坑总结

9.1 常见反模式

反模式后果修正
不批量 setState50fps 卡顿requestAnimationFrame
整个对话一个 React 组件流式时全列表重渲染拆子组件,按 id 订阅
JSON.parse 流式 partial抛错用 partial-json 库
客户端断了不取消 LLM烧 tokenreq.on('close') 取消
用 WebSocket 但不需要双向复杂度高用 SSE
不处理 markdown 流式渲染半成品代码块出错用 Streamdown
Edge 用 Node-only 库部署失败看 SDK 是否支持 Edge runtime
长任务用 Edge超时用 Worker + 异步

9.2 性能调优清单

  • TTFT < 1s(首 token 时间)
  • Token rate > 30/s
  • React 渲染 > 30 FPS
  • 断线自动重试(至少 1 次)
  • 用户中断生效 < 100ms
  • 工具调用展示流式参数
  • Markdown 渲染处理半成品
  • 监控 TTFT、token rate、断线率

第十章:未来方向

10.1 Resumable Streaming

Vercel 在推 resumable-stream。让流可以断点续传,类似视频播放。

2026 年开始有标准化方案。

10.2 双向流 + Tool Use

OpenAI Realtime API 已经做了——文本 + 音频 + 工具调用全在一个 WebSocket。

下一步:标准化、跨厂商兼容。

10.3 Server Components Streaming

React Server Components + AI 流式:

// 服务端组件直接流式 LLM 输出
async function AIResponse({ message }: { message: string }) {
const stream = await llm.stream(message);
return <StreamingText stream={stream} />;
}

减少客户端 JS,更快首屏。

10.4 边缘原生 LLM

Cloudflare Workers AI、Vercel AI Gateway —— LLM 推理本身在 Edge。

挑战:模型 size 大,边缘节点存储有限。但 1B-3B 小模型已经可以全 Edge 跑。


权威资料

核对日期:2026-06-12