参考数据:消息批处理 API 参考 — Python
Data: Message Batches API reference — Python
v2.1.63Python Batches API reference including batch creation, status polling, and result retrieval at 50% cost
消息批量处理 API — Python
批量处理 API(POST /v1/messages/batches)以异步方式处理 Messages API 请求,价格为标准价格的 50%。
关键信息
- 每批次最多 100,000 个请求或 256 MB
- 大多数批次在 1 小时内完成;最长不超过 24 小时
- 创建后 29 天内可获取结果
- 所有 token 使用成本降低 50%
- 支持所有 Messages API 功能(视觉、工具、缓存等)
创建批次
python
import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id="request-1",
params=MessageCreateParamsNonStreaming(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Summarize climate change impacts"}]
)
),
Request(
custom_id="request-2",
params=MessageCreateParamsNonStreaming(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum computing basics"}]
)
),
]
)
print(f"Batch ID: {message_batch.id}")
print(f"Status: {message_batch.processing_status}")轮询完成状态
python
import time
while True:
batch = client.messages.batches.retrieve(message_batch.id)
if batch.processing_status == "ended":
break
print(f"Status: {batch.processing_status}, processing: {batch.request_counts.processing}")
time.sleep(60)
print("Batch complete!")
print(f"Succeeded: {batch.request_counts.succeeded}")
print(f"Errored: {batch.request_counts.errored}")获取结果
注意: 以下示例使用
match/case语法,需要 Python 3.10+。对于更早的版本,请使用if/elif链。
python
for result in client.messages.batches.results(message_batch.id):
match result.result.type:
case "succeeded":
print(f"[{result.custom_id}] {result.result.message.content[0].text[:100]}")
case "errored":
if result.result.error.type == "invalid_request":
print(f"[{result.custom_id}] Validation error - fix request and retry")
else:
print(f"[{result.custom_id}] Server error - safe to retry")
case "canceled":
print(f"[{result.custom_id}] Canceled")
case "expired":
print(f"[{result.custom_id}] Expired - resubmit")取消批次
python
cancelled = client.messages.batches.cancel(message_batch.id)
print(f"Status: {cancelled.processing_status}") # "canceling"使用提示缓存的批次
python
shared_system = [
{"type": "text", "text": "You are a literary analyst."},
{
"type": "text",
"text": large_document_text, # 在所有请求间共享
"cache_control": {"type": "ephemeral"}
}
]
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id=f"analysis-{i}",
params=MessageCreateParamsNonStreaming(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system=shared_system,
messages=[{"role": "user", "content": question}]
)
)
for i, question in enumerate(questions)
]
)完整端到端示例
python
import anthropic
import time
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
# 1. 准备请求
items_to_classify = [
"The product quality is excellent!",
"Terrible customer service, never again.",
"It's okay, nothing special.",
]
requests = [
Request(
custom_id=f"classify-{i}",
params=MessageCreateParamsNonStreaming(
model="{\{HAIKU_ID}\}",
max_tokens=50,
messages=[{
"role": "user",
"content": f"Classify as positive/negative/neutral (one word): {text}"
}]
)
)
for i, text in enumerate(items_to_classify)
]
# 2. 创建批次
batch = client.messages.batches.create(requests=requests)
print(f"Created batch: {batch.id}")
# 3. 等待完成
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(10)
# 4. 收集结果
results = {}
for result in client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
results[result.custom_id] = result.result.message.content[0].text
for custom_id, classification in sorted(results.items()):
print(f"{custom_id}: {classification}")英文原文 / English Original
Message Batches API — Python
The Batches API (POST /v1/messages/batches) processes Messages API requests asynchronously at 50% of standard prices.
Key Facts
- Up to 100,000 requests or 256 MB per batch
- Most batches complete within 1 hour; maximum 24 hours
- Results available for 29 days after creation
- 50% cost reduction on all token usage
- All Messages API features supported (vision, tools, caching, etc.)
Create a Batch
python
import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id="request-1",
params=MessageCreateParamsNonStreaming(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Summarize climate change impacts"}]
)
),
Request(
custom_id="request-2",
params=MessageCreateParamsNonStreaming(
model="{\{OPUS_ID}\}",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum computing basics"}]
)
),
]
)
print(f"Batch ID: {message_batch.id}")
print(f"Status: {message_batch.processing_status}")Poll for Completion
python
import time
while True:
batch = client.messages.batches.retrieve(message_batch.id)
if batch.processing_status == "ended":
break
print(f"Status: {batch.processing_status}, processing: {batch.request_counts.processing}")
time.sleep(60)
print("Batch complete!")
print(f"Succeeded: {batch.request_counts.succeeded}")
print(f"Errored: {batch.request_counts.errored}")Retrieve Results
Note: Examples below use
match/casesyntax, requiring Python 3.10+. For earlier versions, useif/elifchains instead.
python
for result in client.messages.batches.results(message_batch.id):
match result.result.type:
case "succeeded":
print(f"[{result.custom_id}] {result.result.message.content[0].text[:100]}")
case "errored":
if result.result.error.type == "invalid_request":
print(f"[{result.custom_id}] Validation error - fix request and retry")
else:
print(f"[{result.custom_id}] Server error - safe to retry")
case "canceled":
print(f"[{result.custom_id}] Canceled")
case "expired":
print(f"[{result.custom_id}] Expired - resubmit")Cancel a Batch
python
cancelled = client.messages.batches.cancel(message_batch.id)
print(f"Status: {cancelled.processing_status}") # "canceling"Batch with Prompt Caching
python
shared_system = [
{"type": "text", "text": "You are a literary analyst."},
{
"type": "text",
"text": large_document_text, # Shared across all requests
"cache_control": {"type": "ephemeral"}
}
]
message_batch = client.messages.batches.create(
requests=[
Request(
custom_id=f"analysis-{i}",
params=MessageCreateParamsNonStreaming(
model="{\{OPUS_ID}\}",
max_tokens=1024,
system=shared_system,
messages=[{"role": "user", "content": question}]
)
)
for i, question in enumerate(questions)
]
)Full End-to-End Example
python
import anthropic
import time
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
# 1. Prepare requests
items_to_classify = [
"The product quality is excellent!",
"Terrible customer service, never again.",
"It's okay, nothing special.",
]
requests = [
Request(
custom_id=f"classify-{i}",
params=MessageCreateParamsNonStreaming(
model="{\{HAIKU_ID}\}",
max_tokens=50,
messages=[{
"role": "user",
"content": f"Classify as positive/negative/neutral (one word): {text}"
}]
)
)
for i, text in enumerate(items_to_classify)
]
# 2. Create batch
batch = client.messages.batches.create(requests=requests)
print(f"Created batch: {batch.id}")
# 3. Wait for completion
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(10)
# 4. Collect results
results = {}
for result in client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
results[result.custom_id] = result.result.message.content[0].text
for custom_id, classification in sorted(results.items()):
print(f"{custom_id}: {classification}")