OpenAI API Streaming
这页我按 OpenAI 官方 Streaming API responses 指南重新对过了。当前官方最推荐的新项目路径,是直接在 Responses API 上开 stream=true,通过 SSE 增量接收事件。
先记住当前主线
OpenAI 官方 streaming guide 现在明确写的是:
Responses API可以直接stream=True- 底层是 HTTP streaming over server-sent events (SSE)
- 如果是旧项目,
Chat Completions也还支持stream=True
所以更稳的理解是:
- 新项目:先用 Responses streaming
- 老项目:可以继续维护 Chat Completions streaming
Responses API 流式示例
Node.js
import { OpenAI } from "openai";
const client = new OpenAI();
const stream = await client.responses.create({
model: "gpt-5",
input: [
{
role: "user",
content: "Say 'double bubble bath' ten times fast.",
},
],
stream: true,
});
for await (const event of stream) {
console.log(event);
}
Python
from openai import OpenAI
client = OpenAI()
stream = client.responses.create(
model="gpt-5",
input=[
{
"role": "user",
"content": "Say 'double bubble bath' ten times fast.",
},
],
stream=True,
)
for event in stream:
print(event)
Chat Completions 旧流式还在
官方 guide 也保留了 Chat Completions 的流式写法,所以如果你在维护旧项目,不用因为 streaming 立刻重写整条链路。
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-5",
messages=[
{
"role": "user",
"content": "Say 'double bubble bath' ten times fast.",
},
],
stream=True,
)
for chunk in stream:
print(chunk)
一个更实用的提醒
别把 streaming 只理解成“更炫”。它真正有用的地方是:
- 首字更快出现
- 长回复时用户不会觉得卡死
- 前端可以边收边渲染
但这也意味着你要在前后端额外处理:
- 中断
- 重连
- 前端增量渲染
- 最终完成状态
官方参考
- Streaming responses guide: https://developers.openai.com/api/docs/guides/streaming-responses
- Responses API: https://platform.openai.com/docs/api-reference/responses
## Chunk 数据结构
```python
# 每个 chunk 的结构
{
"id": "chatcmpl-xxx",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-5.2",
"choices": [{
"index": 0,
"delta": {
"content": "你" # 增量内容
},
"finish_reason": null # 最后一个为 "stop"
}]
}
处理完成状态
for chunk in stream:
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
if delta.content:
print(delta.content, end="")
if finish_reason == "stop":
print("\n[完成]")
elif finish_reason == "length":
print("\n[达到长度限制]")
常见坑
- 只处理增量内容,不处理
finish_reason导致状态机错乱 - 前端直接拼接 chunk,不做断线重试和去重
- 没有超时与取消控制,长请求占满连接资源
流式 + Function Calling
stream = client.chat.completions.create(
model="gpt-5.2",
messages=[{"role": "user", "content": "北京天气怎么样?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"parameters": {...}
}
}],
stream=True
)
tool_calls = []
for chunk in stream:
delta = chunk.choices[0].delta
# 处理工具调用
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.index >= len(tool_calls):
tool_calls.append({"name": "", "arguments": ""})
if tc.function.name:
tool_calls[tc.index]["name"] = tc.function.name
if tc.function.arguments:
tool_calls[tc.index]["arguments"] += tc.function.arguments
# 处理文本内容
if delta.content:
print(delta.content, end="")
print(f"\n工具调用: {tool_calls}")
错误处理
from openai import APIError, APIConnectionError
try:
stream = client.chat.completions.create(
model="gpt-5.2",
messages=[...],
stream=True
)
for chunk in stream:
# 处理 chunk
pass
except APIConnectionError:
print("连接失败,请检查网络")
except APIError as e:
print(f"API 错误: {e}")
except Exception as e:
print(f"未知错误: {e}")
性能优化
设置超时
from openai import OpenAI
import httpx
client = OpenAI(
timeout=httpx.Timeout(60.0, connect=5.0)
)
重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def stream_with_retry(messages):
return client.chat.completions.create(
model="gpt-5.2",
messages=messages,
stream=True
)
下一步
- Function Calling - 让 AI 调用函数
- Embeddings - 文本向量化
- Vision - 图像理解
提示:流式响应是构建聊天应用的核心技术,务必掌握。