logo

Claude API 流式响应

流式响应让你可以实时接收 Claude 的输出,提升用户体验。

你可以把它当作“实时旁白模式”:
模型边想边说,用户更容易感知系统正在处理,而不是页面卡死。

基础用法

Python 流式

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "讲一个简短的故事"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

使用事件处理

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            print(event.delta.text, end="")
        elif event.type == "message_stop":
            print("\n[完成]")

获取完整响应

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    response = stream.get_final_message()

print(response.content[0].text)
print(f"输入 tokens: {response.usage.input_tokens}")
print(f"输出 tokens: {response.usage.output_tokens}")

读者导向:上线路径

  1. 先用 text_stream 跑通最小可用流式输出。
  2. 再升级为事件级处理,覆盖 stop reason 与 usage 上报。
  3. 最后补取消、重连、超时控制,保证前后端状态一致。

Node.js 流式

基础流式

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

async function streamChat() {
  const stream = client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: '讲一个简短的故事' }]
  });

  stream.on('text', (text) => {
    process.stdout.write(text);
  });

  const finalMessage = await stream.finalMessage();
  console.log('\n完成');
}

streamChat();

使用 for await

async function streamWithIterator() {
  const stream = await client.messages.create({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Hello' }],
    stream: true
  });

  for await (const event of stream) {
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text);
    }
  }
}

事件类型

Claude 流式 API 发送的事件类型:

# 消息开始
{"type": "message_start", "message": {...}}

# 内容块开始
{"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}

# 内容增量
{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "你好"}}

# 内容块结束
{"type": "content_block_stop", "index": 0}

# 消息增量(usage 信息)
{"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 50}}

# 消息结束
{"type": "message_stop"}

完整事件处理

def handle_stream(messages):
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=messages
    ) as stream:
        for event in stream:
            match event.type:
                case "message_start":
                    print(f"开始生成 (model: {event.message.model})")

                case "content_block_start":
                    print(f"内容块 {event.index} 开始")

                case "content_block_delta":
                    if event.delta.type == "text_delta":
                        print(event.delta.text, end="")

                case "content_block_stop":
                    print(f"\n内容块 {event.index} 结束")

                case "message_delta":
                    print(f"\n停止原因: {event.delta.stop_reason}")
                    print(f"输出 tokens: {event.usage.output_tokens}")

                case "message_stop":
                    print("消息完成")

Web 应用集成

FastAPI + SSE

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.Anthropic()

async def generate_stream(prompt: str):
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            yield f"data: {text}\n\n"

    yield "data: [DONE]\n\n"

@app.get("/chat")
async def chat(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream"
    )

Next.js API Route

// app/api/chat/route.ts
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

export async function POST(req: Request) {
  const { messages } = await req.json();

  const stream = await client.messages.create({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages,
    stream: true
  });

  const encoder = new TextEncoder();
  const readable = new ReadableStream({
    async start(controller) {
      for await (const event of stream) {
        if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
          controller.enqueue(encoder.encode(event.delta.text));
        }
      }
      controller.close();
    }
  });

  return new Response(readable, {
    headers: { 'Content-Type': 'text/plain; charset=utf-8' }
  });
}

前端接收 (React)

async function streamChat(
  messages: Array<{role: string; content: string}>,
  onChunk: (text: string) => void
) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  });

  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  while (reader) {
    const { done, value } = await reader.read();
    if (done) break;
    onChunk(decoder.decode(value));
  }
}

// React 组件
function ChatComponent() {
  const [response, setResponse] = useState('');

  const handleSend = async (prompt: string) => {
    setResponse('');
    await streamChat(
      [{ role: 'user', content: prompt }],
      (chunk) => setResponse(prev => prev + chunk)
    );
  };

  return <div>{response}</div>;
}

流式 + Tool Use

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=[{
        "name": "get_weather",
        "description": "获取天气",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string"}
            },
            "required": ["city"]
        }
    }],
    messages=[{"role": "user", "content": "北京天气怎么样?"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(f"文本: {event.delta.text}")
            elif event.delta.type == "input_json_delta":
                print(f"工具参数: {event.delta.partial_json}")

错误处理

import anthropic

try:
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="")

except anthropic.APIConnectionError:
    print("连接失败")
except anthropic.RateLimitError:
    print("请求过于频繁")
except anthropic.APIStatusError as e:
    print(f"API 错误: {e}")

常见坑

  • 前端只拼文本,不跟踪事件类型导致 UI 状态错位
  • 服务端没做请求中断处理,连接泄露
  • 忽略 usage 统计,难以定位成本异常

性能优化

设置超时

client = anthropic.Anthropic(
    timeout=60.0  # 60 秒超时
)

重试机制

client = anthropic.Anthropic(
    max_retries=3  # 最多重试 3 次
)

与 OpenAI 流式的区别

特性Claude APIOpenAI API
流式方法messages.stream()stream=True
事件结构细粒度事件类型统一 chunk 格式
上下文管理器支持 with 语句需要手动处理
最终消息get_final_message()需要手动组装

下一步


提示:Claude 的流式 API 使用上下文管理器,更加 Pythonic。

Claude API Guide
AI Engineer

Claude API Guide

Build with the Claude API for messages, streaming, multimodal input, and production integrations.

Claude API GuideStreaming

Claude API 流式响应

流式响应让你可以实时接收 Claude 的输出,提升用户体验。

你可以把它当作“实时旁白模式”:
模型边想边说,用户更容易感知系统正在处理,而不是页面卡死。

#基础用法

#Python 流式

python
import anthropic client = anthropic.Anthropic() with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "讲一个简短的故事"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

#使用事件处理

python
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: for event in stream: if event.type == "content_block_delta": print(event.delta.text, end="") elif event.type == "message_stop": print("\n[完成]")

#获取完整响应

python
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: response = stream.get_final_message() print(response.content[0].text) print(f"输入 tokens: {response.usage.input_tokens}") print(f"输出 tokens: {response.usage.output_tokens}")

#读者导向:上线路径

  1. 先用 text_stream 跑通最小可用流式输出。
  2. 再升级为事件级处理,覆盖 stop reason 与 usage 上报。
  3. 最后补取消、重连、超时控制,保证前后端状态一致。

#Node.js 流式

#基础流式

typescript
import Anthropic from '@anthropic-ai/sdk'; const client = new Anthropic(); async function streamChat() { const stream = client.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages: [{ role: 'user', content: '讲一个简短的故事' }] }); stream.on('text', (text) => { process.stdout.write(text); }); const finalMessage = await stream.finalMessage(); console.log('\n完成'); } streamChat();

#使用 for await

typescript
async function streamWithIterator() { const stream = await client.messages.create({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages: [{ role: 'user', content: 'Hello' }], stream: true }); for await (const event of stream) { if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { process.stdout.write(event.delta.text); } } }

#事件类型

Claude 流式 API 发送的事件类型:

python
# 消息开始 {"type": "message_start", "message": {...}} # 内容块开始 {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}} # 内容增量 {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "你好"}} # 内容块结束 {"type": "content_block_stop", "index": 0} # 消息增量(usage 信息) {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 50}} # 消息结束 {"type": "message_stop"}

#完整事件处理

python
def handle_stream(messages): with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=messages ) as stream: for event in stream: match event.type: case "message_start": print(f"开始生成 (model: {event.message.model})") case "content_block_start": print(f"内容块 {event.index} 开始") case "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="") case "content_block_stop": print(f"\n内容块 {event.index} 结束") case "message_delta": print(f"\n停止原因: {event.delta.stop_reason}") print(f"输出 tokens: {event.usage.output_tokens}") case "message_stop": print("消息完成")

#Web 应用集成

#FastAPI + SSE

python
from fastapi import FastAPI from fastapi.responses import StreamingResponse import anthropic app = FastAPI() client = anthropic.Anthropic() async def generate_stream(prompt: str): with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) as stream: for text in stream.text_stream: yield f"data: {text}\n\n" yield "data: [DONE]\n\n" @app.get("/chat") async def chat(prompt: str): return StreamingResponse( generate_stream(prompt), media_type="text/event-stream" )

#Next.js API Route

typescript
// app/api/chat/route.ts import Anthropic from '@anthropic-ai/sdk'; const client = new Anthropic(); export async function POST(req: Request) { const { messages } = await req.json(); const stream = await client.messages.create({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages, stream: true }); const encoder = new TextEncoder(); const readable = new ReadableStream({ async start(controller) { for await (const event of stream) { if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { controller.enqueue(encoder.encode(event.delta.text)); } } controller.close(); } }); return new Response(readable, { headers: { 'Content-Type': 'text/plain; charset=utf-8' } }); }

#前端接收 (React)

typescript
async function streamChat( messages: Array<{role: string; content: string}>, onChunk: (text: string) => void ) { const response = await fetch('/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages }) }); const reader = response.body?.getReader(); const decoder = new TextDecoder(); while (reader) { const { done, value } = await reader.read(); if (done) break; onChunk(decoder.decode(value)); } } // React 组件 function ChatComponent() { const [response, setResponse] = useState(''); const handleSend = async (prompt: string) => { setResponse(''); await streamChat( [{ role: 'user', content: prompt }], (chunk) => setResponse(prev => prev + chunk) ); }; return <div>{response}</div>; }

#流式 + Tool Use

python
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, tools=[{ "name": "get_weather", "description": "获取天气", "input_schema": { "type": "object", "properties": { "city": {"type": "string"} }, "required": ["city"] } }], messages=[{"role": "user", "content": "北京天气怎么样?"}] ) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "text_delta": print(f"文本: {event.delta.text}") elif event.delta.type == "input_json_delta": print(f"工具参数: {event.delta.partial_json}")

#错误处理

python
import anthropic try: with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: for text in stream.text_stream: print(text, end="") except anthropic.APIConnectionError: print("连接失败") except anthropic.RateLimitError: print("请求过于频繁") except anthropic.APIStatusError as e: print(f"API 错误: {e}")

#常见坑

  • 前端只拼文本,不跟踪事件类型导致 UI 状态错位
  • 服务端没做请求中断处理,连接泄露
  • 忽略 usage 统计,难以定位成本异常

#性能优化

#设置超时

python
client = anthropic.Anthropic( timeout=60.0 # 60 秒超时 )

#重试机制

python
client = anthropic.Anthropic( max_retries=3 # 最多重试 3 次 )

#与 OpenAI 流式的区别

特性Claude APIOpenAI API
流式方法messages.stream()stream=True
事件结构细粒度事件类型统一 chunk 格式
上下文管理器支持 with 语句需要手动处理
最终消息get_final_message()需要手动组装

#下一步


提示:Claude 的流式 API 使用上下文管理器,更加 Pythonic。

System Design

Core system design concepts and practical case studies

Learn the trade-offs and patterns that matter in technical interviews.

Open System Design →

Related Roadmaps