Skip to content

参考数据:流式传输参考 — TypeScript

Data: Streaming reference — TypeScript

v2.1.63

TypeScript streaming reference including basic streaming and handling different content types

流式传输 — TypeScript

快速开始

typescript
const stream = client.messages.stream({
  model: "{\{OPUS_ID}\}",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Write a story" }],
});

for await (const event of stream) {
  if (
    event.type === "content_block_delta" &&
    event.delta.type === "text_delta"
  ) {
    process.stdout.write(event.delta.text);
  }
}

处理不同的内容类型

Opus 4.6: 使用 thinking: {type: "adaptive"}。在旧模型上,请使用 thinking: {type: "enabled", budget_tokens: N}

typescript
const stream = client.messages.stream({
  model: "{\{OPUS_ID}\}",
  max_tokens: 16000,
  thinking: { type: "adaptive" },
  messages: [{ role: "user", content: "Analyze this problem" }],
});

for await (const event of stream) {
  switch (event.type) {
    case "content_block_start":
      switch (event.content_block.type) {
        case "thinking":
          console.log("\n[Thinking...]");
          break;
        case "text":
          console.log("\n[Response:]");
          break;
      }
      break;
    case "content_block_delta":
      switch (event.delta.type) {
        case "thinking_delta":
          process.stdout.write(event.delta.thinking);
          break;
        case "text_delta":
          process.stdout.write(event.delta.text);
          break;
      }
      break;
  }
}

流式传输与工具使用 (Tool Runner)

将 tool runner 与 stream: true 结合使用。外层循环遍历 tool runner 的迭代(消息),内层循环处理流事件:

typescript
import Anthropic from "@anthropic-ai/sdk";
import { betaZodTool } from "@anthropic-ai/sdk/helpers/beta/zod";
import { z } from "zod";

const client = new Anthropic();

const getWeather = betaZodTool({
  name: "get_weather",
  description: "Get current weather for a location",
  inputSchema: z.object({
    location: z.string().describe("City and state, e.g., San Francisco, CA"),
  }),
  run: async ({ location }) => `72°F and sunny in ${location}`,
});

const runner = client.beta.messages.toolRunner({
  model: "{\{OPUS_ID}\}",
  max_tokens: 4096,
  tools: [getWeather],
  messages: [
    { role: "user", content: "What's the weather in Paris and London?" },
  ],
  stream: true,
});

// 外层循环:每个 tool runner 迭代
for await (const messageStream of runner) {
  // 内层循环:此迭代的流事件
  for await (const event of messageStream) {
    switch (event.type) {
      case "content_block_delta":
        switch (event.delta.type) {
          case "text_delta":
            process.stdout.write(event.delta.text);
            break;
          case "input_json_delta":
            // 工具输入正在流式传输
            break;
        }
        break;
    }
  }
}

获取最终消息

typescript
const stream = client.messages.stream({
  model: "{\{OPUS_ID}\}",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Hello" }],
});

for await (const event of stream) {
  // 处理事件...
}

const finalMessage = await stream.finalMessage();
console.log(`Tokens used: ${finalMessage.usage.output_tokens}`);

流事件类型

事件类型描述触发时机
message_start包含消息元数据开始时一次
content_block_start新内容块开始当 text/tool_use 块开始时
content_block_delta增量内容更新对于每个 token/chunk
content_block_stop内容块完成当一个块结束时
message_delta消息级别的更新包含 stop_reason, usage
message_stop消息完成结束时一次

最佳实践

  1. 始终刷新输出 — 使用 process.stdout.write() 以立即显示
  2. 处理部分响应 — 如果流被中断,您可能会得到不完整的内容
  3. 跟踪 token 使用量message_delta 事件包含使用量信息
  4. 使用 finalMessage() — 即使在流式传输时也能获取完整的 Anthropic.Message 对象。不要在 new Promise() 中包装 .on() 事件 — finalMessage() 内部处理所有完成/错误/中止状态
  5. 为 Web UI 缓冲 — 考虑在渲染前缓冲几个 token,以避免过多的 DOM 更新
  6. 使用 stream.on("text", ...) 处理增量text 事件仅提供增量字符串,比手动过滤 content_block_delta 事件更简单
  7. 对于带流式传输的 agentic 循环 — 请参阅 tool-use.md 中的 流式手动循环 部分,了解如何将 stream() + finalMessage() 与 tool-use 循环结合使用

原始 SSE 格式

如果使用原始 HTTP(而非 SDK),流返回 Server-Sent Events:

event: message_start
data: {"type":"message_start","message":{"id":"msg_...","type":"message",...}\}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}\}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}\}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":12}\}

event: message_stop
data: {"type":"message_stop"}

英文原文 / English Original

Streaming — TypeScript

Quick Start

typescript
const stream = client.messages.stream({
  model: "{\{OPUS_ID}\}",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Write a story" }],
});

for await (const event of stream) {
  if (
    event.type === "content_block_delta" &&
    event.delta.type === "text_delta"
  ) {
    process.stdout.write(event.delta.text);
  }
}

Handling Different Content Types

Opus 4.6: Use thinking: {type: "adaptive"}. On older models, use thinking: {type: "enabled", budget_tokens: N} instead.

typescript
const stream = client.messages.stream({
  model: "{\{OPUS_ID}\}",
  max_tokens: 16000,
  thinking: { type: "adaptive" },
  messages: [{ role: "user", content: "Analyze this problem" }],
});

for await (const event of stream) {
  switch (event.type) {
    case "content_block_start":
      switch (event.content_block.type) {
        case "thinking":
          console.log("\\n[Thinking...]");
          break;
        case "text":
          console.log("\\n[Response:]");
          break;
      }
      break;
    case "content_block_delta":
      switch (event.delta.type) {
        case "thinking_delta":
          process.stdout.write(event.delta.thinking);
          break;
        case "text_delta":
          process.stdout.write(event.delta.text);
          break;
      }
      break;
  }
}

Streaming with Tool Use (Tool Runner)

Use the tool runner with stream: true. The outer loop iterates over tool runner iterations (messages), the inner loop processes stream events:

typescript
import Anthropic from "@anthropic-ai/sdk";
import { betaZodTool } from "@anthropic-ai/sdk/helpers/beta/zod";
import { z } from "zod";

const client = new Anthropic();

const getWeather = betaZodTool({
  name: "get_weather",
  description: "Get current weather for a location",
  inputSchema: z.object({
    location: z.string().describe("City and state, e.g., San Francisco, CA"),
  }),
  run: async ({ location }) => `72°F and sunny in \${location}`,
});

const runner = client.beta.messages.toolRunner({
  model: "{\{OPUS_ID}\}",
  max_tokens: 4096,
  tools: [getWeather],
  messages: [
    { role: "user", content: "What's the weather in Paris and London?" },
  ],
  stream: true,
});

// Outer loop: each tool runner iteration
for await (const messageStream of runner) {
  // Inner loop: stream events for this iteration
  for await (const event of messageStream) {
    switch (event.type) {
      case "content_block_delta":
        switch (event.delta.type) {
          case "text_delta":
            process.stdout.write(event.delta.text);
            break;
          case "input_json_delta":
            // Tool input being streamed
            break;
        }
        break;
    }
  }
}

Getting the Final Message

typescript
const stream = client.messages.stream({
  model: "{\{OPUS_ID}\}",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Hello" }],
});

for await (const event of stream) {
  // Process events...
}

const finalMessage = await stream.finalMessage();
console.log(`Tokens used: \${finalMessage.usage.output_tokens}`);

Stream Event Types

Event TypeDescriptionWhen it fires
message_startContains message metadataOnce at the beginning
content_block_startNew content block beginningWhen a text/tool_use block starts
content_block_deltaIncremental content updateFor each token/chunk
content_block_stopContent block completeWhen a block finishes
message_deltaMessage-level updatesContains stop_reason, usage
message_stopMessage completeOnce at the end

Best Practices

  1. Always flush output — Use process.stdout.write() for immediate display
  2. Handle partial responses — If the stream is interrupted, you may have incomplete content
  3. Track token usage — The message_delta event contains usage information
  4. Use finalMessage() — Get the complete Anthropic.Message object even when streaming. Don't wrap .on() events in new Promise()finalMessage() handles all completion/error/abort states internally
  5. Buffer for web UIs — Consider buffering a few tokens before rendering to avoid excessive DOM updates
  6. Use stream.on("text", ...) for deltas — The text event provides just the delta string, simpler than manually filtering content_block_delta events
  7. For agentic loops with streaming — See the Streaming Manual Loop section in tool-use.md for combining stream() + finalMessage() with a tool-use loop

Raw SSE Format

If using raw HTTP (not SDKs), the stream returns Server-Sent Events:

event: message_start
data: {"type":"message_start","message":{"id":"msg_...","type":"message",...}\}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}\}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}\}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":12}\}

event: message_stop
data: {"type":"message_stop"}