logo

OpenAI API Streaming

这页我按 OpenAI 官方 Streaming API responses 指南重新对过了。当前官方最推荐的新项目路径,是直接在 Responses API 上开 stream=true,通过 SSE 增量接收事件。

先记住当前主线

OpenAI 官方 streaming guide 现在明确写的是:

  • Responses API 可以直接 stream=True
  • 底层是 HTTP streaming over server-sent events (SSE)
  • 如果是旧项目,Chat Completions 也还支持 stream=True

所以更稳的理解是:

  • 新项目:先用 Responses streaming
  • 老项目:可以继续维护 Chat Completions streaming

Responses API 流式示例

Node.js

import { OpenAI } from "openai";

const client = new OpenAI();

const stream = await client.responses.create({
  model: "gpt-5",
  input: [
    {
      role: "user",
      content: "Say 'double bubble bath' ten times fast.",
    },
  ],
  stream: true,
});

for await (const event of stream) {
  console.log(event);
}

Python

from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5",
    input=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times fast.",
        },
    ],
    stream=True,
)

for event in stream:
    print(event)

Chat Completions 旧流式还在

官方 guide 也保留了 Chat Completions 的流式写法,所以如果你在维护旧项目,不用因为 streaming 立刻重写整条链路。

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-5",
    messages=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times fast.",
        },
    ],
    stream=True,
)

for chunk in stream:
    print(chunk)

一个更实用的提醒

别把 streaming 只理解成“更炫”。它真正有用的地方是:

  • 首字更快出现
  • 长回复时用户不会觉得卡死
  • 前端可以边收边渲染

但这也意味着你要在前后端额外处理:

  • 中断
  • 重连
  • 前端增量渲染
  • 最终完成状态

官方参考


## Chunk 数据结构

```python
# 每个 chunk 的结构
{
    "id": "chatcmpl-xxx",
    "object": "chat.completion.chunk",
    "created": 1234567890,
    "model": "gpt-5.2",
    "choices": [{
        "index": 0,
        "delta": {
            "content": "你"  # 增量内容
        },
        "finish_reason": null  # 最后一个为 "stop"
    }]
}

处理完成状态

for chunk in stream:
    delta = chunk.choices[0].delta
    finish_reason = chunk.choices[0].finish_reason

    if delta.content:
        print(delta.content, end="")

    if finish_reason == "stop":
        print("\n[完成]")
    elif finish_reason == "length":
        print("\n[达到长度限制]")

常见坑

  • 只处理增量内容,不处理 finish_reason 导致状态机错乱
  • 前端直接拼接 chunk,不做断线重试和去重
  • 没有超时与取消控制,长请求占满连接资源

流式 + Function Calling

stream = client.chat.completions.create(
    model="gpt-5.2",
    messages=[{"role": "user", "content": "北京天气怎么样?"}],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "parameters": {...}
        }
    }],
    stream=True
)

tool_calls = []
for chunk in stream:
    delta = chunk.choices[0].delta

    # 处理工具调用
    if delta.tool_calls:
        for tc in delta.tool_calls:
            if tc.index >= len(tool_calls):
                tool_calls.append({"name": "", "arguments": ""})
            if tc.function.name:
                tool_calls[tc.index]["name"] = tc.function.name
            if tc.function.arguments:
                tool_calls[tc.index]["arguments"] += tc.function.arguments

    # 处理文本内容
    if delta.content:
        print(delta.content, end="")

print(f"\n工具调用: {tool_calls}")

错误处理

from openai import APIError, APIConnectionError

try:
    stream = client.chat.completions.create(
        model="gpt-5.2",
        messages=[...],
        stream=True
    )

    for chunk in stream:
        # 处理 chunk
        pass

except APIConnectionError:
    print("连接失败,请检查网络")
except APIError as e:
    print(f"API 错误: {e}")
except Exception as e:
    print(f"未知错误: {e}")

性能优化

设置超时

from openai import OpenAI
import httpx

client = OpenAI(
    timeout=httpx.Timeout(60.0, connect=5.0)
)

重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
def stream_with_retry(messages):
    return client.chat.completions.create(
        model="gpt-5.2",
        messages=messages,
        stream=True
    )

下一步


提示:流式响应是构建聊天应用的核心技术,务必掌握。

OpenAI API 开发指南
AI Engineer

OpenAI API 开发指南

OpenAI API 是最广泛使用的 AI API 之一,提供 GPT-4、DALL-E、Whisper 等模型的访问。

OpenAI API 开发指南流式响应

OpenAI API Streaming

这页我按 OpenAI 官方 Streaming API responses 指南重新对过了。当前官方最推荐的新项目路径,是直接在 Responses API 上开 stream=true,通过 SSE 增量接收事件。

#先记住当前主线

OpenAI 官方 streaming guide 现在明确写的是:

  • Responses API 可以直接 stream=True
  • 底层是 HTTP streaming over server-sent events (SSE)
  • 如果是旧项目,Chat Completions 也还支持 stream=True

所以更稳的理解是:

  • 新项目:先用 Responses streaming
  • 老项目:可以继续维护 Chat Completions streaming

#Responses API 流式示例

#Node.js

ts
import { OpenAI } from "openai"; const client = new OpenAI(); const stream = await client.responses.create({ model: "gpt-5", input: [ { role: "user", content: "Say 'double bubble bath' ten times fast.", }, ], stream: true, }); for await (const event of stream) { console.log(event); }

#Python

python
from openai import OpenAI client = OpenAI() stream = client.responses.create( model="gpt-5", input=[ { "role": "user", "content": "Say 'double bubble bath' ten times fast.", }, ], stream=True, ) for event in stream: print(event)

#Chat Completions 旧流式还在

官方 guide 也保留了 Chat Completions 的流式写法,所以如果你在维护旧项目,不用因为 streaming 立刻重写整条链路。

python
from openai import OpenAI client = OpenAI() stream = client.chat.completions.create( model="gpt-5", messages=[ { "role": "user", "content": "Say 'double bubble bath' ten times fast.", }, ], stream=True, ) for chunk in stream: print(chunk)

#一个更实用的提醒

别把 streaming 只理解成“更炫”。它真正有用的地方是:

  • 首字更快出现
  • 长回复时用户不会觉得卡死
  • 前端可以边收边渲染

但这也意味着你要在前后端额外处理:

  • 中断
  • 重连
  • 前端增量渲染
  • 最终完成状态

#官方参考


## Chunk 数据结构

```python
# 每个 chunk 的结构
{
    "id": "chatcmpl-xxx",
    "object": "chat.completion.chunk",
    "created": 1234567890,
    "model": "gpt-5.2",
    "choices": [{
        "index": 0,
        "delta": {
            "content": "你"  # 增量内容
        },
        "finish_reason": null  # 最后一个为 "stop"
    }]
}

#处理完成状态

python
for chunk in stream: delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason if delta.content: print(delta.content, end="") if finish_reason == "stop": print("\n[完成]") elif finish_reason == "length": print("\n[达到长度限制]")

#常见坑

  • 只处理增量内容,不处理 finish_reason 导致状态机错乱
  • 前端直接拼接 chunk,不做断线重试和去重
  • 没有超时与取消控制,长请求占满连接资源

#流式 + Function Calling

python
stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": "北京天气怎么样?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "parameters": {...} } }], stream=True ) tool_calls = [] for chunk in stream: delta = chunk.choices[0].delta # 处理工具调用 if delta.tool_calls: for tc in delta.tool_calls: if tc.index >= len(tool_calls): tool_calls.append({"name": "", "arguments": ""}) if tc.function.name: tool_calls[tc.index]["name"] = tc.function.name if tc.function.arguments: tool_calls[tc.index]["arguments"] += tc.function.arguments # 处理文本内容 if delta.content: print(delta.content, end="") print(f"\n工具调用: {tool_calls}")

#错误处理

python
from openai import APIError, APIConnectionError try: stream = client.chat.completions.create( model="gpt-5.2", messages=[...], stream=True ) for chunk in stream: # 处理 chunk pass except APIConnectionError: print("连接失败,请检查网络") except APIError as e: print(f"API 错误: {e}") except Exception as e: print(f"未知错误: {e}")

#性能优化

#设置超时

python
from openai import OpenAI import httpx client = OpenAI( timeout=httpx.Timeout(60.0, connect=5.0) )

#重试机制

python
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) def stream_with_retry(messages): return client.chat.completions.create( model="gpt-5.2", messages=messages, stream=True )

#下一步


提示:流式响应是构建聊天应用的核心技术,务必掌握。

System Design

系统设计必备:核心概念 + 经典案例

快速掌握取舍与设计套路,备战系统设计面试。

进入 System Design →

相关路线图