Skip to content

参考数据:流式传输参考 — Python

Data: Streaming reference — Python

v2.1.63

Python streaming reference including sync/async streaming and handling different content types

流式传输 — Python

快速开始

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a story"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

异步版本

python
async with async_client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a story"}]
) as stream:
    async for text in stream.text_stream:
        print(text, end="", flush=True)

处理不同的内容类型

Claude 可能返回文本、思考块或工具调用。请适当处理每种类型:

Opus 4.6: 使用 thinking: {type: "adaptive"}。在旧版模型上,请改用 thinking: {type: "enabled", budget_tokens: N}

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=16000,
    thinking={"type": "adaptive"},
    messages=[{"role": "user", "content": "Analyze this problem"}]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "thinking":
                print("\n[Thinking...]")
            elif event.content_block.type == "text":
                print("\n[Response:]")

        elif event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                print(event.delta.thinking, end="", flush=True)
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)

流式传输与工具调用

Python 工具运行器目前返回完整的消息。如果你需要在工具调用时实现逐 token 的流式传输,可以在手动循环中对单个 API 调用使用流式传输:

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=4096,
    tools=tools,
    messages=messages
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    response = stream.get_final_message()
    # 如果 response.stop_reason == "tool_use",则继续执行工具调用

获取最终消息

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    # 流式传输后获取完整消息
    final_message = stream.get_final_message()
    print(f"\n\nTokens used: {final_message.usage.output_tokens}")

带进度更新的流式传输

python
def stream_with_progress(client, **kwargs):
    """流式传输响应并附带进度更新。"""
    total_tokens = 0
    content_parts = []

    with client.messages.stream(**kwargs) as stream:
        for event in stream:
            if event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    text = event.delta.text
                    content_parts.append(text)
                    print(text, end="", flush=True)

            elif event.type == "message_delta":
                if event.usage and event.usage.output_tokens is not None:
                    total_tokens = event.usage.output_tokens

        final_message = stream.get_final_message()

    print(f"\n\n[Tokens used: {total_tokens}]")
    return "".join(content_parts)

流式传输中的错误处理

python
try:
    with client.messages.stream(
        model="{\{OPUS_ID}\}",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Write a story"}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
except anthropic.APIConnectionError:
    print("\nConnection lost. Please retry.")
except anthropic.RateLimitError:
    print("\nRate limited. Please wait and retry.")
except anthropic.APIStatusError as e:
    print(f"\nAPI error: {e.status_code}")

流事件类型

事件类型描述触发时机
message_start包含消息元数据开始时触发一次
content_block_start新内容块开始当文本/工具调用块开始时触发
content_block_delta增量内容更新每个 token/数据块触发
content_block_stop内容块完成当一个块完成时触发
message_delta消息级别的更新包含 stop_reason、usage 信息
message_stop消息完成结束时触发一次

最佳实践

  1. 始终刷新输出 — 使用 flush=True 以立即显示 token
  2. 处理部分响应 — 如果流被中断,你可能得到不完整的内容
  3. 跟踪 token 使用量message_delta 事件包含使用量信息
  4. 使用超时设置 — 为你的应用程序设置适当的超时
  5. 默认使用流式传输 — 即使在使用流式传输时,也使用 .get_final_message() 来获取完整响应,这为你提供了超时保护,而无需处理单个事件

英文原文 / English Original

Streaming — Python

Quick Start

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a story"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Async

python
async with async_client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a story"}]
) as stream:
    async for text in stream.text_stream:
        print(text, end="", flush=True)

Handling Different Content Types

Claude may return text, thinking blocks, or tool use. Handle each appropriately:

Opus 4.6: Use thinking: {type: "adaptive"}. On older models, use thinking: {type: "enabled", budget_tokens: N} instead.

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=16000,
    thinking={"type": "adaptive"},
    messages=[{"role": "user", "content": "Analyze this problem"}]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "thinking":
                print("\\n[Thinking...]")
            elif event.content_block.type == "text":
                print("\\n[Response:]")

        elif event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                print(event.delta.thinking, end="", flush=True)
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)

Streaming with Tool Use

The Python tool runner currently returns complete messages. Use streaming for individual API calls within a manual loop if you need per-token streaming with tools:

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=4096,
    tools=tools,
    messages=messages
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    response = stream.get_final_message()
    # Continue with tool execution if response.stop_reason == "tool_use"

Getting the Final Message

python
with client.messages.stream(
    model="{\{OPUS_ID}\}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    # Get full message after streaming
    final_message = stream.get_final_message()
    print(f"\\n\\nTokens used: {final_message.usage.output_tokens}")

Streaming with Progress Updates

python
def stream_with_progress(client, **kwargs):
    """Stream a response with progress updates."""
    total_tokens = 0
    content_parts = []

    with client.messages.stream(**kwargs) as stream:
        for event in stream:
            if event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    text = event.delta.text
                    content_parts.append(text)
                    print(text, end="", flush=True)

            elif event.type == "message_delta":
                if event.usage and event.usage.output_tokens is not None:
                    total_tokens = event.usage.output_tokens

        final_message = stream.get_final_message()

    print(f"\\n\\n[Tokens used: {total_tokens}]")
    return "".join(content_parts)

Error Handling in Streams

python
try:
    with client.messages.stream(
        model="{\{OPUS_ID}\}",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Write a story"}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
except anthropic.APIConnectionError:
    print("\\nConnection lost. Please retry.")
except anthropic.RateLimitError:
    print("\\nRate limited. Please wait and retry.")
except anthropic.APIStatusError as e:
    print(f"\\nAPI error: {e.status_code}")

Stream Event Types

Event TypeDescriptionWhen it fires
message_startContains message metadataOnce at the beginning
content_block_startNew content block beginningWhen a text/tool_use block starts
content_block_deltaIncremental content updateFor each token/chunk
content_block_stopContent block completeWhen a block finishes
message_deltaMessage-level updatesContains stop_reason, usage
message_stopMessage completeOnce at the end

Best Practices

  1. Always flush output — Use flush=True to show tokens immediately
  2. Handle partial responses — If the stream is interrupted, you may have incomplete content
  3. Track token usage — The message_delta event contains usage information
  4. Use timeouts — Set appropriate timeouts for your application
  5. Default to streaming — Use .get_final_message() to get the complete response even when streaming, giving you timeout protection without needing to handle individual events