OpenAI API Streaming
Streaming 让你可以实时接收 AI 的输出,而不是等待完整回复。这对于提升用户体验和构建聊天应用至关重要。
把它想成“边写边显示的打字机模式”:
用户不用等全文生成完才看到结果,体感会从“卡住了”变成“系统在工作”。
建议优先使用 Responses API 的 stream 机制;本文示例以 Chat Completions 为主,方便老项目迁移时对照。
为什么使用流式响应?
普通模式 vs 流式模式
普通模式:
用户发送 → [等待 3-5 秒] → 一次性返回完整回复
流式模式:
用户发送 → 立即开始返回 → 逐字显示 → 完成
优势
| 特性 | 普通模式 | 流式模式 |
|---|---|---|
| 首字延迟 | 3-5 秒 | < 1 秒 |
| 用户体验 | 等待感强 | 即时反馈 |
| 长回复 | 可能超时 | 稳定传输 |
| 适用场景 | API 调用 | 聊天界面 |
读者导向:生产化落地顺序
- 先在后端跑通最小 streaming(能稳定吐字)。
- 再接前端增量渲染(含取消请求与重连处理)。
- 最后补监控(首字延迟、完成率、异常中断率)。
Python 实现
基础流式
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-5.2",
messages=[{"role": "user", "content": "讲一个简短的故事"}],
stream=True # 启用流式
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
使用上下文管理器
with client.chat.completions.create(
model="gpt-5.2",
messages=[{"role": "user", "content": "Hello"}],
stream=True
) as stream:
for chunk in stream:
content = chunk.choices[0].delta.content or ""
print(content, end="", flush=True)
收集完整响应
def stream_chat(messages: list) -> str:
"""流式输出并返回完整响应"""
client = OpenAI()
full_response = ""
stream = client.chat.completions.create(
model="gpt-5.2",
messages=messages,
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content or ""
print(content, end="", flush=True)
full_response += content
print() # 换行
return full_response
# 使用
response = stream_chat([
{"role": "user", "content": "用三句话介绍 Python"}
])
print(f"\n完整响应长度: {len(response)}")
Node.js 实现
基础流式
import OpenAI from 'openai';
const client = new OpenAI();
async function streamChat() {
const stream = await client.chat.completions.create({
model: 'gpt-5.2',
messages: [{ role: 'user', content: '讲一个简短的故事' }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
process.stdout.write(content);
}
}
streamChat();
使用 Stream Helper
import OpenAI from 'openai';
const client = new OpenAI();
async function streamWithHelper() {
const stream = client.beta.chat.completions.stream({
model: 'gpt-5.2',
messages: [{ role: 'user', content: 'Hello' }]
});
// 事件监听
stream.on('content', (delta, snapshot) => {
process.stdout.write(delta);
});
// 获取最终结果
const finalResponse = await stream.finalChatCompletion();
console.log('\n完成:', finalResponse.choices[0].message.content);
}
Web 应用集成
FastAPI + SSE
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
async def generate_stream(prompt: str):
stream = client.chat.completions.create(
model="gpt-5.2",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content or ""
if content:
yield f"data: {content}\n\n"
yield "data: [DONE]\n\n"
@app.get("/chat")
async def chat(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream"
)
前端接收 (React)
async function streamChat(prompt: string, onChunk: (text: string) => void) {
const response = await fetch(`/chat?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const content = line.slice(6);
if (content !== '[DONE]') {
onChunk(content);
}
}
}
}
}
// React 组件中使用
function ChatComponent() {
const [response, setResponse] = useState('');
const handleSend = async () => {
setResponse('');
await streamChat('你的问题', (chunk) => {
setResponse(prev => prev + chunk);
});
};
return (
<div>
<button onClick={handleSend}>发送</button>
<div>{response}</div>
</div>
);
}
Next.js API Route
// app/api/chat/route.ts
import { OpenAI } from 'openai';
const client = new OpenAI();
export async function POST(req: Request) {
const { messages } = await req.json();
const stream = await client.chat.completions.create({
model: 'gpt-5.2',
messages,
stream: true
});
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
controller.enqueue(encoder.encode(content));
}
controller.close();
}
});
return new Response(readable, {
headers: { 'Content-Type': 'text/plain; charset=utf-8' }
});
}
Chunk 数据结构
# 每个 chunk 的结构
{
"id": "chatcmpl-xxx",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-5.2",
"choices": [{
"index": 0,
"delta": {
"content": "你" # 增量内容
},
"finish_reason": null # 最后一个为 "stop"
}]
}
处理完成状态
for chunk in stream:
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
if delta.content:
print(delta.content, end="")
if finish_reason == "stop":
print("\n[完成]")
elif finish_reason == "length":
print("\n[达到长度限制]")
常见坑
- 只处理增量内容,不处理
finish_reason导致状态机错乱 - 前端直接拼接 chunk,不做断线重试和去重
- 没有超时与取消控制,长请求占满连接资源
流式 + Function Calling
stream = client.chat.completions.create(
model="gpt-5.2",
messages=[{"role": "user", "content": "北京天气怎么样?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"parameters": {...}
}
}],
stream=True
)
tool_calls = []
for chunk in stream:
delta = chunk.choices[0].delta
# 处理工具调用
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.index >= len(tool_calls):
tool_calls.append({"name": "", "arguments": ""})
if tc.function.name:
tool_calls[tc.index]["name"] = tc.function.name
if tc.function.arguments:
tool_calls[tc.index]["arguments"] += tc.function.arguments
# 处理文本内容
if delta.content:
print(delta.content, end="")
print(f"\n工具调用: {tool_calls}")
错误处理
from openai import APIError, APIConnectionError
try:
stream = client.chat.completions.create(
model="gpt-5.2",
messages=[...],
stream=True
)
for chunk in stream:
# 处理 chunk
pass
except APIConnectionError:
print("连接失败,请检查网络")
except APIError as e:
print(f"API 错误: {e}")
except Exception as e:
print(f"未知错误: {e}")
性能优化
设置超时
from openai import OpenAI
import httpx
client = OpenAI(
timeout=httpx.Timeout(60.0, connect=5.0)
)
重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def stream_with_retry(messages):
return client.chat.completions.create(
model="gpt-5.2",
messages=messages,
stream=True
)
下一步
- Function Calling - 让 AI 调用函数
- Embeddings - 文本向量化
- Vision - 图像理解
提示:流式响应是构建聊天应用的核心技术,务必掌握。