logo

OpenAI API Streaming

这页我按 OpenAI 官方 Streaming API responses 指南重新对过了。当前官方最推荐的新项目路径,是直接在 Responses API 上开 stream=true,通过 SSE 增量接收事件。

先记住当前主线

OpenAI 官方 streaming guide 现在明确写的是:

  • Responses API 可以直接 stream=True
  • 底层是 HTTP streaming over server-sent events (SSE)
  • 如果是旧项目,Chat Completions 也还支持 stream=True

所以更稳的理解是:

  • 新项目:先用 Responses streaming
  • 老项目:可以继续维护 Chat Completions streaming

Responses API 流式示例

Node.js

import { OpenAI } from "openai";

const client = new OpenAI();

const stream = await client.responses.create({
  model: "gpt-5",
  input: [
    {
      role: "user",
      content: "Say 'double bubble bath' ten times fast.",
    },
  ],
  stream: true,
});

for await (const event of stream) {
  console.log(event);
}

Python

from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-5",
    input=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times fast.",
        },
    ],
    stream=True,
)

for event in stream:
    print(event)

Chat Completions 旧流式还在

官方 guide 也保留了 Chat Completions 的流式写法,所以如果你在维护旧项目,不用因为 streaming 立刻重写整条链路。

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-5",
    messages=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times fast.",
        },
    ],
    stream=True,
)

for chunk in stream:
    print(chunk)

一个更实用的提醒

别把 streaming 只理解成“更炫”。它真正有用的地方是:

  • 首字更快出现
  • 长回复时用户不会觉得卡死
  • 前端可以边收边渲染

但这也意味着你要在前后端额外处理:

  • 中断
  • 重连
  • 前端增量渲染
  • 最终完成状态

官方参考


## Chunk 数据结构

```python
# 每个 chunk 的结构
{
    "id": "chatcmpl-xxx",
    "object": "chat.completion.chunk",
    "created": 1234567890,
    "model": "gpt-5.2",
    "choices": [{
        "index": 0,
        "delta": {
            "content": "你"  # 增量内容
        },
        "finish_reason": null  # 最后一个为 "stop"
    }]
}

处理完成状态

for chunk in stream:
    delta = chunk.choices[0].delta
    finish_reason = chunk.choices[0].finish_reason

    if delta.content:
        print(delta.content, end="")

    if finish_reason == "stop":
        print("\n[完成]")
    elif finish_reason == "length":
        print("\n[达到长度限制]")

常见坑

  • 只处理增量内容,不处理 finish_reason 导致状态机错乱
  • 前端直接拼接 chunk,不做断线重试和去重
  • 没有超时与取消控制,长请求占满连接资源

流式 + Function Calling

stream = client.chat.completions.create(
    model="gpt-5.2",
    messages=[{"role": "user", "content": "北京天气怎么样?"}],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "parameters": {...}
        }
    }],
    stream=True
)

tool_calls = []
for chunk in stream:
    delta = chunk.choices[0].delta

    # 处理工具调用
    if delta.tool_calls:
        for tc in delta.tool_calls:
            if tc.index >= len(tool_calls):
                tool_calls.append({"name": "", "arguments": ""})
            if tc.function.name:
                tool_calls[tc.index]["name"] = tc.function.name
            if tc.function.arguments:
                tool_calls[tc.index]["arguments"] += tc.function.arguments

    # 处理文本内容
    if delta.content:
        print(delta.content, end="")

print(f"\n工具调用: {tool_calls}")

错误处理

from openai import APIError, APIConnectionError

try:
    stream = client.chat.completions.create(
        model="gpt-5.2",
        messages=[...],
        stream=True
    )

    for chunk in stream:
        # 处理 chunk
        pass

except APIConnectionError:
    print("连接失败,请检查网络")
except APIError as e:
    print(f"API 错误: {e}")
except Exception as e:
    print(f"未知错误: {e}")

性能优化

设置超时

from openai import OpenAI
import httpx

client = OpenAI(
    timeout=httpx.Timeout(60.0, connect=5.0)
)

重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
def stream_with_retry(messages):
    return client.chat.completions.create(
        model="gpt-5.2",
        messages=messages,
        stream=True
    )

下一步


提示:流式响应是构建聊天应用的核心技术,务必掌握。

OpenAI API Guide
AI Engineer

OpenAI API Guide

Build with the OpenAI API using Responses API, streaming, tools, embeddings, and multimodal inputs.

OpenAI API GuideStreaming

OpenAI API Streaming

这页我按 OpenAI 官方 Streaming API responses 指南重新对过了。当前官方最推荐的新项目路径,是直接在 Responses API 上开 stream=true,通过 SSE 增量接收事件。

#先记住当前主线

OpenAI 官方 streaming guide 现在明确写的是:

  • Responses API 可以直接 stream=True
  • 底层是 HTTP streaming over server-sent events (SSE)
  • 如果是旧项目,Chat Completions 也还支持 stream=True

所以更稳的理解是:

  • 新项目:先用 Responses streaming
  • 老项目:可以继续维护 Chat Completions streaming

#Responses API 流式示例

#Node.js

ts
import { OpenAI } from "openai"; const client = new OpenAI(); const stream = await client.responses.create({ model: "gpt-5", input: [ { role: "user", content: "Say 'double bubble bath' ten times fast.", }, ], stream: true, }); for await (const event of stream) { console.log(event); }

#Python

python
from openai import OpenAI client = OpenAI() stream = client.responses.create( model="gpt-5", input=[ { "role": "user", "content": "Say 'double bubble bath' ten times fast.", }, ], stream=True, ) for event in stream: print(event)

#Chat Completions 旧流式还在

官方 guide 也保留了 Chat Completions 的流式写法,所以如果你在维护旧项目,不用因为 streaming 立刻重写整条链路。

python
from openai import OpenAI client = OpenAI() stream = client.chat.completions.create( model="gpt-5", messages=[ { "role": "user", "content": "Say 'double bubble bath' ten times fast.", }, ], stream=True, ) for chunk in stream: print(chunk)

#一个更实用的提醒

别把 streaming 只理解成“更炫”。它真正有用的地方是:

  • 首字更快出现
  • 长回复时用户不会觉得卡死
  • 前端可以边收边渲染

但这也意味着你要在前后端额外处理:

  • 中断
  • 重连
  • 前端增量渲染
  • 最终完成状态

#官方参考


## Chunk 数据结构

```python
# 每个 chunk 的结构
{
    "id": "chatcmpl-xxx",
    "object": "chat.completion.chunk",
    "created": 1234567890,
    "model": "gpt-5.2",
    "choices": [{
        "index": 0,
        "delta": {
            "content": "你"  # 增量内容
        },
        "finish_reason": null  # 最后一个为 "stop"
    }]
}

#处理完成状态

python
for chunk in stream: delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason if delta.content: print(delta.content, end="") if finish_reason == "stop": print("\n[完成]") elif finish_reason == "length": print("\n[达到长度限制]")

#常见坑

  • 只处理增量内容,不处理 finish_reason 导致状态机错乱
  • 前端直接拼接 chunk,不做断线重试和去重
  • 没有超时与取消控制,长请求占满连接资源

#流式 + Function Calling

python
stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": "北京天气怎么样?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "parameters": {...} } }], stream=True ) tool_calls = [] for chunk in stream: delta = chunk.choices[0].delta # 处理工具调用 if delta.tool_calls: for tc in delta.tool_calls: if tc.index >= len(tool_calls): tool_calls.append({"name": "", "arguments": ""}) if tc.function.name: tool_calls[tc.index]["name"] = tc.function.name if tc.function.arguments: tool_calls[tc.index]["arguments"] += tc.function.arguments # 处理文本内容 if delta.content: print(delta.content, end="") print(f"\n工具调用: {tool_calls}")

#错误处理

python
from openai import APIError, APIConnectionError try: stream = client.chat.completions.create( model="gpt-5.2", messages=[...], stream=True ) for chunk in stream: # 处理 chunk pass except APIConnectionError: print("连接失败,请检查网络") except APIError as e: print(f"API 错误: {e}") except Exception as e: print(f"未知错误: {e}")

#性能优化

#设置超时

python
from openai import OpenAI import httpx client = OpenAI( timeout=httpx.Timeout(60.0, connect=5.0) )

#重试机制

python
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) def stream_with_retry(messages): return client.chat.completions.create( model="gpt-5.2", messages=messages, stream=True )

#下一步


提示:流式响应是构建聊天应用的核心技术,务必掌握。

System Design

Core system design concepts and practical case studies

Learn the trade-offs and patterns that matter in technical interviews.

Open System Design →

Related Roadmaps