OpenAI API 流式响应
流式响应(Streaming)让你可以实时接收 AI 的输出,而不是等待完整回复。这对于提升用户体验和构建聊天应用至关重要。
#为什么使用流式响应?
#普通模式 vs 流式模式
普通模式:
用户发送 → [等待 3-5 秒] → 一次性返回完整回复
流式模式:
用户发送 → 立即开始返回 → 逐字显示 → 完成
#优势
| 特性 | 普通模式 | 流式模式 |
|---|---|---|
| 首字延迟 | 3-5 秒 | < 1 秒 |
| 用户体验 | 等待感强 | 即时反馈 |
| 长回复 | 可能超时 | 稳定传输 |
| 适用场景 | API 调用 | 聊天界面 |
#Python 实现
#基础流式
pythonfrom openai import OpenAI client = OpenAI() stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "讲一个简短的故事"}], stream=True # 启用流式 ) for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="", flush=True)
#使用上下文管理器
pythonwith client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "Hello"}], stream=True ) as stream: for chunk in stream: content = chunk.choices[0].delta.content or "" print(content, end="", flush=True)
#收集完整响应
pythondef stream_chat(messages: list) -> str: """流式输出并返回完整响应""" client = OpenAI() full_response = "" stream = client.chat.completions.create( model="gpt-4o", messages=messages, stream=True ) for chunk in stream: content = chunk.choices[0].delta.content or "" print(content, end="", flush=True) full_response += content print() # 换行 return full_response # 使用 response = stream_chat([ {"role": "user", "content": "用三句话介绍 Python"} ]) print(f"\n完整响应长度: {len(response)}")
#Node.js 实现
#基础流式
typescriptimport OpenAI from 'openai'; const client = new OpenAI(); async function streamChat() { const stream = await client.chat.completions.create({ model: 'gpt-4o', messages: [{ role: 'user', content: '讲一个简短的故事' }], stream: true }); for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; process.stdout.write(content); } } streamChat();
#使用 Stream Helper
typescriptimport OpenAI from 'openai'; const client = new OpenAI(); async function streamWithHelper() { const stream = client.beta.chat.completions.stream({ model: 'gpt-4o', messages: [{ role: 'user', content: 'Hello' }] }); // 事件监听 stream.on('content', (delta, snapshot) => { process.stdout.write(delta); }); // 获取最终结果 const finalResponse = await stream.finalChatCompletion(); console.log('\n完成:', finalResponse.choices[0].message.content); }
#Web 应用集成
#FastAPI + SSE
pythonfrom fastapi import FastAPI from fastapi.responses import StreamingResponse from openai import OpenAI app = FastAPI() client = OpenAI() async def generate_stream(prompt: str): stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) for chunk in stream: content = chunk.choices[0].delta.content or "" if content: yield f"data: {content}\n\n" yield "data: [DONE]\n\n" @app.get("/chat") async def chat(prompt: str): return StreamingResponse( generate_stream(prompt), media_type="text/event-stream" )
#前端接收 (React)
typescriptasync function streamChat(prompt: string, onChunk: (text: string) => void) { const response = await fetch(`/chat?prompt=${encodeURIComponent(prompt)}`); const reader = response.body?.getReader(); const decoder = new TextDecoder(); while (reader) { const { done, value } = await reader.read(); if (done) break; const text = decoder.decode(value); const lines = text.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const content = line.slice(6); if (content !== '[DONE]') { onChunk(content); } } } } } // React 组件中使用 function ChatComponent() { const [response, setResponse] = useState(''); const handleSend = async () => { setResponse(''); await streamChat('你的问题', (chunk) => { setResponse(prev => prev + chunk); }); }; return ( <div> <button onClick={handleSend}>发送</button> <div>{response}</div> </div> ); }
#Next.js API Route
typescript// app/api/chat/route.ts import { OpenAI } from 'openai'; const client = new OpenAI(); export async function POST(req: Request) { const { messages } = await req.json(); const stream = await client.chat.completions.create({ model: 'gpt-4o', messages, stream: true }); const encoder = new TextEncoder(); const readable = new ReadableStream({ async start(controller) { for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; controller.enqueue(encoder.encode(content)); } controller.close(); } }); return new Response(readable, { headers: { 'Content-Type': 'text/plain; charset=utf-8' } }); }
#Chunk 数据结构
python# 每个 chunk 的结构 { "id": "chatcmpl-xxx", "object": "chat.completion.chunk", "created": 1234567890, "model": "gpt-4o", "choices": [{ "index": 0, "delta": { "content": "你" # 增量内容 }, "finish_reason": null # 最后一个为 "stop" }] }
#处理完成状态
pythonfor chunk in stream: delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason if delta.content: print(delta.content, end="") if finish_reason == "stop": print("\n[完成]") elif finish_reason == "length": print("\n[达到长度限制]")
#流式 + Function Calling
pythonstream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": "北京天气怎么样?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "parameters": {...} } }], stream=True ) tool_calls = [] for chunk in stream: delta = chunk.choices[0].delta # 处理工具调用 if delta.tool_calls: for tc in delta.tool_calls: if tc.index >= len(tool_calls): tool_calls.append({"name": "", "arguments": ""}) if tc.function.name: tool_calls[tc.index]["name"] = tc.function.name if tc.function.arguments: tool_calls[tc.index]["arguments"] += tc.function.arguments # 处理文本内容 if delta.content: print(delta.content, end="") print(f"\n工具调用: {tool_calls}")
#错误处理
pythonfrom openai import APIError, APIConnectionError try: stream = client.chat.completions.create( model="gpt-4o", messages=[...], stream=True ) for chunk in stream: # 处理 chunk pass except APIConnectionError: print("连接失败,请检查网络") except APIError as e: print(f"API 错误: {e}") except Exception as e: print(f"未知错误: {e}")
#性能优化
#设置超时
pythonfrom openai import OpenAI import httpx client = OpenAI( timeout=httpx.Timeout(60.0, connect=5.0) )
#重试机制
pythonfrom tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) def stream_with_retry(messages): return client.chat.completions.create( model="gpt-4o", messages=messages, stream=True )
#下一步
- Function Calling - 让 AI 调用函数
- Embeddings - 文本向量化
- Vision - 图像理解
提示:流式响应是构建聊天应用的核心技术,务必掌握。