logo
OpenAI API 开发指南
AI Engineer

OpenAI API 开发指南

OpenAI API 是最广泛使用的 AI API 之一,提供 GPT-4、DALL-E、Whisper 等模型的访问。

OpenAI API 开发指南流式响应

OpenAI API 流式响应

流式响应(Streaming)让你可以实时接收 AI 的输出,而不是等待完整回复。这对于提升用户体验和构建聊天应用至关重要。

#为什么使用流式响应?

#普通模式 vs 流式模式

普通模式:
用户发送 → [等待 3-5 秒] → 一次性返回完整回复

流式模式:
用户发送 → 立即开始返回 → 逐字显示 → 完成

#优势

特性普通模式流式模式
首字延迟3-5 秒< 1 秒
用户体验等待感强即时反馈
长回复可能超时稳定传输
适用场景API 调用聊天界面

#Python 实现

#基础流式

python
from openai import OpenAI client = OpenAI() stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "讲一个简短的故事"}], stream=True # 启用流式 ) for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="", flush=True)

#使用上下文管理器

python
with client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Hello"}], stream=True ) as stream: for chunk in stream: content = chunk.choices[0].delta.content or "" print(content, end="", flush=True)

#收集完整响应

python
def stream_chat(messages: list) -> str: """流式输出并返回完整响应""" client = OpenAI() full_response = "" stream = client.chat.completions.create( model="gpt-4o", messages=messages, stream=True ) for chunk in stream: content = chunk.choices[0].delta.content or "" print(content, end="", flush=True) full_response += content print() # 换行 return full_response # 使用 response = stream_chat([ {"role": "user", "content": "用三句话介绍 Python"} ]) print(f"\n完整响应长度: {len(response)}")

#Node.js 实现

#基础流式

typescript
import OpenAI from 'openai'; const client = new OpenAI(); async function streamChat() { const stream = await client.chat.completions.create({ model: 'gpt-4o', messages: [{ role: 'user', content: '讲一个简短的故事' }], stream: true }); for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; process.stdout.write(content); } } streamChat();

#使用 Stream Helper

typescript
import OpenAI from 'openai'; const client = new OpenAI(); async function streamWithHelper() { const stream = client.beta.chat.completions.stream({ model: 'gpt-4o', messages: [{ role: 'user', content: 'Hello' }] }); // 事件监听 stream.on('content', (delta, snapshot) => { process.stdout.write(delta); }); // 获取最终结果 const finalResponse = await stream.finalChatCompletion(); console.log('\n完成:', finalResponse.choices[0].message.content); }

#Web 应用集成

#FastAPI + SSE

python
from fastapi import FastAPI from fastapi.responses import StreamingResponse from openai import OpenAI app = FastAPI() client = OpenAI() async def generate_stream(prompt: str): stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) for chunk in stream: content = chunk.choices[0].delta.content or "" if content: yield f"data: {content}\n\n" yield "data: [DONE]\n\n" @app.get("/chat") async def chat(prompt: str): return StreamingResponse( generate_stream(prompt), media_type="text/event-stream" )

#前端接收 (React)

typescript
async function streamChat(prompt: string, onChunk: (text: string) => void) { const response = await fetch(`/chat?prompt=${encodeURIComponent(prompt)}`); const reader = response.body?.getReader(); const decoder = new TextDecoder(); while (reader) { const { done, value } = await reader.read(); if (done) break; const text = decoder.decode(value); const lines = text.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const content = line.slice(6); if (content !== '[DONE]') { onChunk(content); } } } } } // React 组件中使用 function ChatComponent() { const [response, setResponse] = useState(''); const handleSend = async () => { setResponse(''); await streamChat('你的问题', (chunk) => { setResponse(prev => prev + chunk); }); }; return ( <div> <button onClick={handleSend}>发送</button> <div>{response}</div> </div> ); }

#Next.js API Route

typescript
// app/api/chat/route.ts import { OpenAI } from 'openai'; const client = new OpenAI(); export async function POST(req: Request) { const { messages } = await req.json(); const stream = await client.chat.completions.create({ model: 'gpt-4o', messages, stream: true }); const encoder = new TextEncoder(); const readable = new ReadableStream({ async start(controller) { for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; controller.enqueue(encoder.encode(content)); } controller.close(); } }); return new Response(readable, { headers: { 'Content-Type': 'text/plain; charset=utf-8' } }); }

#Chunk 数据结构

python
# 每个 chunk 的结构 { "id": "chatcmpl-xxx", "object": "chat.completion.chunk", "created": 1234567890, "model": "gpt-4o", "choices": [{ "index": 0, "delta": { "content": "你" # 增量内容 }, "finish_reason": null # 最后一个为 "stop" }] }

#处理完成状态

python
for chunk in stream: delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason if delta.content: print(delta.content, end="") if finish_reason == "stop": print("\n[完成]") elif finish_reason == "length": print("\n[达到长度限制]")

#流式 + Function Calling

python
stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "北京天气怎么样?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "parameters": {...} } }], stream=True ) tool_calls = [] for chunk in stream: delta = chunk.choices[0].delta # 处理工具调用 if delta.tool_calls: for tc in delta.tool_calls: if tc.index >= len(tool_calls): tool_calls.append({"name": "", "arguments": ""}) if tc.function.name: tool_calls[tc.index]["name"] = tc.function.name if tc.function.arguments: tool_calls[tc.index]["arguments"] += tc.function.arguments # 处理文本内容 if delta.content: print(delta.content, end="") print(f"\n工具调用: {tool_calls}")

#错误处理

python
from openai import APIError, APIConnectionError try: stream = client.chat.completions.create( model="gpt-4o", messages=[...], stream=True ) for chunk in stream: # 处理 chunk pass except APIConnectionError: print("连接失败,请检查网络") except APIError as e: print(f"API 错误: {e}") except Exception as e: print(f"未知错误: {e}")

#性能优化

#设置超时

python
from openai import OpenAI import httpx client = OpenAI( timeout=httpx.Timeout(60.0, connect=5.0) )

#重试机制

python
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) def stream_with_retry(messages): return client.chat.completions.create( model="gpt-4o", messages=messages, stream=True )

#下一步


提示:流式响应是构建聊天应用的核心技术,务必掌握。

1v1免费职业咨询