参考数据:Claude API 参考 — Python
Data: Claude API reference — Python
v2.1.63Python SDK reference including installation, client initialization, basic requests, thinking, and multi-turn conversation
Claude API — Python
安装
pip install anthropic客户端初始化
import anthropic
# 默认方式(使用 ANTHROPIC_API_KEY 环境变量)
client = anthropic.Anthropic()
# 显式指定 API 密钥
client = anthropic.Anthropic(api_key="your-api-key")
# 异步客户端
async_client = anthropic.AsyncAnthropic()基础消息请求
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[
{"role": "user", "content": "What is the capital of France?"}
]
)
print(response.content[0].text)系统提示词
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system="You are a helpful coding assistant. Always provide examples in Python.",
messages=[{"role": "user", "content": "How do I read a JSON file?"}]
)视觉功能(图像)
Base64 编码
import base64
with open("image.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_data
}
},
{"type": "text", "text": "What's in this image?"}
]
}]
)URL
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "url",
"url": "https://example.com/image.png"
}
},
{"type": "text", "text": "Describe this image"}
]
}]
)提示词缓存
缓存大型上下文以降低成本(最高可节省 90%)。
自动缓存(推荐)
使用顶层的 cache_control 自动缓存请求中最后一个可缓存的块——无需为单个内容块添加注解:
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
cache_control={"type": "ephemeral"}, # 自动缓存最后一个可缓存的块
system="You are an expert on this large document...",
messages=[{"role": "user", "content": "Summarize the key points"}]
)手动缓存控制
如需细粒度控制,可为特定内容块添加 cache_control:
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system=[{
"type": "text",
"text": "You are an expert on this large document...",
"cache_control": {"type": "ephemeral"} # 默认 TTL 为 5 分钟
}],
messages=[{"role": "user", "content": "Summarize the key points"}]
)
# 使用显式 TTL(生存时间)
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system=[{
"type": "text",
"text": "You are an expert on this large document...",
"cache_control": {"type": "ephemeral", "ttl": "1h"} # 1 小时 TTL
}],
messages=[{"role": "user", "content": "Summarize the key points"}]
)扩展思考
Opus 4.6 和 Sonnet 4.6: 使用自适应思考。
budget_tokens在 Opus 4.6 和 Sonnet 4.6 上均已弃用。 旧模型: 使用thinking: {type: "enabled", budget_tokens: N}(必须小于max_tokens,最小为 1024)。
# Opus 4.6:自适应思考(推荐)
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=16000,
thinking={"type": "adaptive"},
output_config={"effort": "high"}, # low | medium | high | max
messages=[{"role": "user", "content": "Solve this step by step..."}]
)
# 访问思考过程和响应
for block in response.content:
if block.type == "thinking":
print(f"Thinking: {block.thinking}")
elif block.type == "text":
print(f"Response: {block.text}")错误处理
import anthropic
try:
response = client.messages.create(...)
except anthropic.BadRequestError as e:
print(f"Bad request: {e.message}")
except anthropic.AuthenticationError:
print("Invalid API key")
except anthropic.PermissionDeniedError:
print("API key lacks required permissions")
except anthropic.NotFoundError:
print("Invalid model or endpoint")
except anthropic.RateLimitError as e:
retry_after = int(e.response.headers.get("retry-after", "60"))
print(f"Rate limited. Retry after {retry_after}s.")
except anthropic.APIStatusError as e:
if e.status_code >= 500:
print(f"Server error ({e.status_code}). Retry later.")
else:
print(f"API error: {e.message}")
except anthropic.APIConnectionError:
print("Network error. Check internet connection.")多轮对话
API 是无状态的——每次都需要发送完整的对话历史。
class ConversationManager:
"""管理与 Claude API 的多轮对话。"""
def __init__(self, client: anthropic.Anthropic, model: str, system: str = None):
self.client = client
self.model = model
self.system = system
self.messages = []
def send(self, user_message: str, **kwargs) -> str:
"""发送消息并获取响应。"""
self.messages.append({"role": "user", "content": user_message})
response = self.client.messages.create(
model=self.model,
max_tokens=kwargs.get("max_tokens", 1024),
system=self.system,
messages=self.messages,
**kwargs
)
assistant_message = response.content[0].text
self.messages.append({"role": "assistant", "content": assistant_message})
return assistant_message
# 使用示例
conversation = ConversationManager(
client=anthropic.Anthropic(),
model="{\{OPUS_ID}\}",
system="You are a helpful assistant."
)
response1 = conversation.send("My name is Alice.")
response2 = conversation.send("What's my name?") # Claude 记得 "Alice"规则:
- 消息必须在
user和assistant之间交替 - 第一条消息必须是
user
压缩(长对话)
Beta 功能,仅限 Opus 4.6。 当对话接近 200K 上下文窗口时,压缩功能会自动在服务器端汇总早期上下文。API 会返回一个
compaction块;你必须在后续请求中将其传回——追加response.content,而不仅仅是文本。
import anthropic
client = anthropic.Anthropic()
messages = []
def chat(user_message: str) -> str:
messages.append({"role": "user", "content": user_message})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="{\{OPUS_ID}\}",
max_tokens=4096,
messages=messages,
context_management={
"edits": [{"type": "compact_20260112"}]
}
)
# 追加完整内容——必须保留压缩块
messages.append({"role": "assistant", "content": response.content})
return next(block.text for block in response.content if block.type == "text")
# 当上下文变得很大时,压缩会自动触发
print(chat("Help me build a Python web scraper"))
print(chat("Add support for JavaScript-rendered pages"))
print(chat("Now add rate limiting and error handling"))停止原因
响应中的 stop_reason 字段指示模型停止生成的原因:
| 值 | 含义 |
|---|---|
end_turn | Claude 自然完成了响应 |
max_tokens | 达到了 max_tokens 限制——增加该值或使用流式传输 |
stop_sequence | 遇到了自定义停止序列 |
tool_use | Claude 想要调用工具——执行它并继续 |
pause_turn | 模型暂停,可以恢复(智能体流程) |
refusal | Claude 出于安全原因拒绝——输出可能不符合你的模式 |
成本优化策略
1. 对重复上下文使用提示词缓存
# 自动缓存(最简单——缓存最后一个可缓存的块)
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
cache_control={"type": "ephemeral"},
system=large_document_text, # 例如,50KB 的上下文
messages=[{"role": "user", "content": "Summarize the key points"}]
)
# 第一次请求:完整成本
# 后续请求:缓存部分约便宜 90%2. 选择合适的模型
# 大多数任务默认使用 Opus
response = client.messages.create(
model="{\{OPUS_ID}\}", # $5.00/$25.00 每 100 万 token
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum computing"}]
)
# 高容量生产工作负载使用 Sonnet
standard_response = client.messages.create(
model="{\{SONNET_ID}\}", # $3.00/$15.00 每 100 万 token
max_tokens=1024,
messages=[{"role": "user", "content": "Summarize this document"}]
)
# 仅对简单、速度关键的任务使用 Haiku
simple_response = client.messages.create(
model="{\{HAIKU_ID}\}", # $1.00/$5.00 每 100 万 token
max_tokens=256,
messages=[{"role": "user", "content": "Classify this as positive or negative"}]
)3. 在请求前使用 token 计数
count_response = client.messages.count_tokens(
model="{\{OPUS_ID}\}",
messages=messages,
system=system
)
estimated_input_cost = count_response.input_tokens * 0.000005 # $5/1M tokens
print(f"Estimated input cost: \${estimated_input_cost:.4f}")指数退避重试
注意: Anthropic SDK 会自动对速率限制(429)和服务器错误(5xx)进行指数退避重试。你可以通过
max_retries(默认值:2)配置此行为。仅当需要 SDK 提供之外的行为时才实现自定义重试逻辑。
import time
import random
import anthropic
def call_with_retry(
client: anthropic.Anthropic,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
**kwargs
):
"""使用指数退避重试调用 API。"""
last_exception = None
for attempt in range(max_retries):
try:
return client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
last_exception = e
except anthropic.APIStatusError as e:
if e.status_code >= 500:
last_exception = e
else:
raise # 客户端错误(4xx,429 除外)不应重试
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s")
time.sleep(delay)
raise last_exception英文原文 / English Original
Claude API — Python
Installation
pip install anthropicClient Initialization
import anthropic
# Default (uses ANTHROPIC_API_KEY env var)
client = anthropic.Anthropic()
# Explicit API key
client = anthropic.Anthropic(api_key="your-api-key")
# Async client
async_client = anthropic.AsyncAnthropic()Basic Message Request
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[
{"role": "user", "content": "What is the capital of France?"}
]
)
print(response.content[0].text)System Prompts
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system="You are a helpful coding assistant. Always provide examples in Python.",
messages=[{"role": "user", "content": "How do I read a JSON file?"}]
)Vision (Images)
Base64
import base64
with open("image.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_data
}
},
{"type": "text", "text": "What's in this image?"}
]
}]
)URL
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "url",
"url": "https://example.com/image.png"
}
},
{"type": "text", "text": "Describe this image"}
]
}]
)Prompt Caching
Cache large context to reduce costs (up to 90% savings).
Automatic Caching (Recommended)
Use top-level cache_control to automatically cache the last cacheable block in the request — no need to annotate individual content blocks:
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
cache_control={"type": "ephemeral"}, # auto-caches the last cacheable block
system="You are an expert on this large document...",
messages=[{"role": "user", "content": "Summarize the key points"}]
)Manual Cache Control
For fine-grained control, add cache_control to specific content blocks:
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system=[{
"type": "text",
"text": "You are an expert on this large document...",
"cache_control": {"type": "ephemeral"} # default TTL is 5 minutes
}],
messages=[{"role": "user", "content": "Summarize the key points"}]
)
# With explicit TTL (time-to-live)
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system=[{
"type": "text",
"text": "You are an expert on this large document...",
"cache_control": {"type": "ephemeral", "ttl": "1h"} # 1 hour TTL
}],
messages=[{"role": "user", "content": "Summarize the key points"}]
)Extended Thinking
Opus 4.6 and Sonnet 4.6: Use adaptive thinking.
budget_tokensis deprecated on both Opus 4.6 and Sonnet 4.6. Older models: Usethinking: {type: "enabled", budget_tokens: N}(must be <max_tokens, min 1024).
# Opus 4.6: adaptive thinking (recommended)
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=16000,
thinking={"type": "adaptive"},
output_config={"effort": "high"}, # low | medium | high | max
messages=[{"role": "user", "content": "Solve this step by step..."}]
)
# Access thinking and response
for block in response.content:
if block.type == "thinking":
print(f"Thinking: {block.thinking}")
elif block.type == "text":
print(f"Response: {block.text}")Error Handling
import anthropic
try:
response = client.messages.create(...)
except anthropic.BadRequestError as e:
print(f"Bad request: {e.message}")
except anthropic.AuthenticationError:
print("Invalid API key")
except anthropic.PermissionDeniedError:
print("API key lacks required permissions")
except anthropic.NotFoundError:
print("Invalid model or endpoint")
except anthropic.RateLimitError as e:
retry_after = int(e.response.headers.get("retry-after", "60"))
print(f"Rate limited. Retry after {retry_after}s.")
except anthropic.APIStatusError as e:
if e.status_code >= 500:
print(f"Server error ({e.status_code}). Retry later.")
else:
print(f"API error: {e.message}")
except anthropic.APIConnectionError:
print("Network error. Check internet connection.")Multi-Turn Conversations
The API is stateless — send the full conversation history each time.
class ConversationManager:
"""Manage multi-turn conversations with the Claude API."""
def __init__(self, client: anthropic.Anthropic, model: str, system: str = None):
self.client = client
self.model = model
self.system = system
self.messages = []
def send(self, user_message: str, **kwargs) -> str:
"""Send a message and get a response."""
self.messages.append({"role": "user", "content": user_message})
response = self.client.messages.create(
model=self.model,
max_tokens=kwargs.get("max_tokens", 1024),
system=self.system,
messages=self.messages,
**kwargs
)
assistant_message = response.content[0].text
self.messages.append({"role": "assistant", "content": assistant_message})
return assistant_message
# Usage
conversation = ConversationManager(
client=anthropic.Anthropic(),
model="{\{OPUS_ID}\}",
system="You are a helpful assistant."
)
response1 = conversation.send("My name is Alice.")
response2 = conversation.send("What's my name?") # Claude remembers "Alice"Rules:
- Messages must alternate between
userandassistant - First message must be
user
Compaction (long conversations)
Beta, Opus 4.6 only. When conversations approach the 200K context window, compaction automatically summarizes earlier context server-side. The API returns a
compactionblock; you must pass it back on subsequent requests — appendresponse.content, not just the text.
import anthropic
client = anthropic.Anthropic()
messages = []
def chat(user_message: str) -> str:
messages.append({"role": "user", "content": user_message})
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="{\{OPUS_ID}\}",
max_tokens=4096,
messages=messages,
context_management={
"edits": [{"type": "compact_20260112"}]
}
)
# Append full content — compaction blocks must be preserved
messages.append({"role": "assistant", "content": response.content})
return next(block.text for block in response.content if block.type == "text")
# Compaction triggers automatically when context grows large
print(chat("Help me build a Python web scraper"))
print(chat("Add support for JavaScript-rendered pages"))
print(chat("Now add rate limiting and error handling"))Stop Reasons
The stop_reason field in the response indicates why the model stopped generating:
| Value | Meaning |
|---|---|
end_turn | Claude finished its response naturally |
max_tokens | Hit the max_tokens limit — increase it or use streaming |
stop_sequence | Hit a custom stop sequence |
tool_use | Claude wants to call a tool — execute it and continue |
pause_turn | Model paused and can be resumed (agentic flows) |
refusal | Claude refused for safety reasons — output may not match your schema |
Cost Optimization Strategies
1. Use Prompt Caching for Repeated Context
# Automatic caching (simplest — caches the last cacheable block)
response = client.messages.create(
model="{\{OPUS_ID}\}",
max_tokens=1024,
cache_control={"type": "ephemeral"},
system=large_document_text, # e.g., 50KB of context
messages=[{"role": "user", "content": "Summarize the key points"}]
)
# First request: full cost
# Subsequent requests: ~90% cheaper for cached portion2. Choose the Right Model
# Default to Opus for most tasks
response = client.messages.create(
model="{\{OPUS_ID}\}", # $5.00/$25.00 per 1M tokens
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum computing"}]
)
# Use Sonnet for high-volume production workloads
standard_response = client.messages.create(
model="{\{SONNET_ID}\}", # $3.00/$15.00 per 1M tokens
max_tokens=1024,
messages=[{"role": "user", "content": "Summarize this document"}]
)
# Use Haiku only for simple, speed-critical tasks
simple_response = client.messages.create(
model="{\{HAIKU_ID}\}", # $1.00/$5.00 per 1M tokens
max_tokens=256,
messages=[{"role": "user", "content": "Classify this as positive or negative"}]
)3. Use Token Counting Before Requests
count_response = client.messages.count_tokens(
model="{\{OPUS_ID}\}",
messages=messages,
system=system
)
estimated_input_cost = count_response.input_tokens * 0.000005 # $5/1M tokens
print(f"Estimated input cost: \${estimated_input_cost:.4f}")Retry with Exponential Backoff
Note: The Anthropic SDK automatically retries rate limit (429) and server errors (5xx) with exponential backoff. You can configure this with
max_retries(default: 2). Only implement custom retry logic if you need behavior beyond what the SDK provides.
import time
import random
import anthropic
def call_with_retry(
client: anthropic.Anthropic,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
**kwargs
):
"""Call the API with exponential backoff retry."""
last_exception = None
for attempt in range(max_retries):
try:
return client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
last_exception = e
except anthropic.APIStatusError as e:
if e.status_code >= 500:
last_exception = e
else:
raise # Client errors (4xx except 429) should not be retried
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s")
time.sleep(delay)
raise last_exception