参考数据:流式传输参考 — Python
Data: Streaming reference — Python
v2.1.63Python streaming reference including sync/async streaming and handling different content types
流式传输 — Python
快速开始
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)异步版本
python
async with async_client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)处理不同的内容类型
Claude 可能返回文本、思考块或工具调用。请适当处理每种类型:
Opus 4.6: 使用
thinking: {type: "adaptive"}。在旧版模型上,请改用thinking: {type: "enabled", budget_tokens: N}。
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=16000,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": "Analyze this problem"}]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "thinking":
print("\n[Thinking...]")
elif event.content_block.type == "text":
print("\n[Response:]")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)流式传输与工具调用
Python 工具运行器目前返回完整的消息。如果你需要在工具调用时实现逐 token 的流式传输,可以在手动循环中对单个 API 调用使用流式传输:
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=4096,
tools=tools,
messages=messages
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
response = stream.get_final_message()
# 如果 response.stop_reason == "tool_use",则继续执行工具调用获取最终消息
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# 流式传输后获取完整消息
final_message = stream.get_final_message()
print(f"\n\nTokens used: {final_message.usage.output_tokens}")带进度更新的流式传输
python
def stream_with_progress(client, **kwargs):
"""流式传输响应并附带进度更新。"""
total_tokens = 0
content_parts = []
with client.messages.stream(**kwargs) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
text = event.delta.text
content_parts.append(text)
print(text, end="", flush=True)
elif event.type == "message_delta":
if event.usage and event.usage.output_tokens is not None:
total_tokens = event.usage.output_tokens
final_message = stream.get_final_message()
print(f"\n\n[Tokens used: {total_tokens}]")
return "".join(content_parts)流式传输中的错误处理
python
try:
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.APIConnectionError:
print("\nConnection lost. Please retry.")
except anthropic.RateLimitError:
print("\nRate limited. Please wait and retry.")
except anthropic.APIStatusError as e:
print(f"\nAPI error: {e.status_code}")流事件类型
| 事件类型 | 描述 | 触发时机 |
|---|---|---|
message_start | 包含消息元数据 | 开始时触发一次 |
content_block_start | 新内容块开始 | 当文本/工具调用块开始时触发 |
content_block_delta | 增量内容更新 | 每个 token/数据块触发 |
content_block_stop | 内容块完成 | 当一个块完成时触发 |
message_delta | 消息级别的更新 | 包含 stop_reason、usage 信息 |
message_stop | 消息完成 | 结束时触发一次 |
最佳实践
- 始终刷新输出 — 使用
flush=True以立即显示 token - 处理部分响应 — 如果流被中断,你可能得到不完整的内容
- 跟踪 token 使用量 —
message_delta事件包含使用量信息 - 使用超时设置 — 为你的应用程序设置适当的超时
- 默认使用流式传输 — 即使在使用流式传输时,也使用
.get_final_message()来获取完整响应,这为你提供了超时保护,而无需处理单个事件
英文原文 / English Original
Streaming — Python
Quick Start
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)Async
python
async with async_client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)Handling Different Content Types
Claude may return text, thinking blocks, or tool use. Handle each appropriately:
Opus 4.6: Use
thinking: {type: "adaptive"}. On older models, usethinking: {type: "enabled", budget_tokens: N}instead.
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=16000,
thinking={"type": "adaptive"},
messages=[{"role": "user", "content": "Analyze this problem"}]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "thinking":
print("\\n[Thinking...]")
elif event.content_block.type == "text":
print("\\n[Response:]")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)Streaming with Tool Use
The Python tool runner currently returns complete messages. Use streaming for individual API calls within a manual loop if you need per-token streaming with tools:
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=4096,
tools=tools,
messages=messages
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
response = stream.get_final_message()
# Continue with tool execution if response.stop_reason == "tool_use"Getting the Final Message
python
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Get full message after streaming
final_message = stream.get_final_message()
print(f"\\n\\nTokens used: {final_message.usage.output_tokens}")Streaming with Progress Updates
python
def stream_with_progress(client, **kwargs):
"""Stream a response with progress updates."""
total_tokens = 0
content_parts = []
with client.messages.stream(**kwargs) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
text = event.delta.text
content_parts.append(text)
print(text, end="", flush=True)
elif event.type == "message_delta":
if event.usage and event.usage.output_tokens is not None:
total_tokens = event.usage.output_tokens
final_message = stream.get_final_message()
print(f"\\n\\n[Tokens used: {total_tokens}]")
return "".join(content_parts)Error Handling in Streams
python
try:
with client.messages.stream(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.APIConnectionError:
print("\\nConnection lost. Please retry.")
except anthropic.RateLimitError:
print("\\nRate limited. Please wait and retry.")
except anthropic.APIStatusError as e:
print(f"\\nAPI error: {e.status_code}")Stream Event Types
| Event Type | Description | When it fires |
|---|---|---|
message_start | Contains message metadata | Once at the beginning |
content_block_start | New content block beginning | When a text/tool_use block starts |
content_block_delta | Incremental content update | For each token/chunk |
content_block_stop | Content block complete | When a block finishes |
message_delta | Message-level updates | Contains stop_reason, usage |
message_stop | Message complete | Once at the end |
Best Practices
- Always flush output — Use
flush=Trueto show tokens immediately - Handle partial responses — If the stream is interrupted, you may have incomplete content
- Track token usage — The
message_deltaevent contains usage information - Use timeouts — Set appropriate timeouts for your application
- Default to streaming — Use
.get_final_message()to get the complete response even when streaming, giving you timeout protection without needing to handle individual events