Appearance
Streaming
在与大语言模型交互时,等待整个响应生成完毕再一次性返回(同步模式)会带来明显的延迟感——用户可能需要等待数秒甚至更长时间才能看到第一个字。Streaming(流式传输) 解决的正是这个问题:模型一边生成内容,一边将结果以事件流(Server-Sent Events, SSE)的形式逐步推送给客户端。
流式传输带来的核心价值有三个。第一是感知延迟大幅降低:用户在几百毫秒内就能看到第一个字开始输出,而不是干等几秒。第二是更好的用户体验:类似 ChatGPT 的逐字打字效果,让用户感觉 AI 正在"思考并回答",互动感更强。第三是支持实时处理:你可以在模型还在生成的过程中就开始处理已接收到的内容,比如实时翻译、实时搜索联想等场景。
本章将详细介绍 Streaming 的工作原理、核心事件类型、Python 和 JavaScript 的实现方式,以及在 Function Calling 等复杂场景下的流式处理技巧。
SSE 事件流程
流式传输基于 Server-Sent Events (SSE) 协议。当你开启 stream=True 后,API 不再返回一个完整的 JSON 响应,而是通过一个持久的 HTTP 连接逐步推送一系列事件。每个事件都有一个 type 字段标识其类型,客户端根据事件类型做出相应处理。
下面是一次典型的流式响应中,事件的触发顺序:
SSE 流式传输事件流程
response.created
响应对象创建
response.in_progress
开始生成
response.output_text.delta
逐字输出文本片段
response.output_text.done
文本输出完成
response.completed
整个响应完成
在这个流程中,response.output_text.delta 事件会被触发多次——每生成几个字(或一个 token)就推送一次。这是你在客户端实现"逐字打字效果"的核心事件。而 response.completed 事件标志着整个生成过程结束,此时你可以执行清理操作,比如更新 UI 状态、记录日志或统计 token 用量。
基础实现
开启流式传输非常简单,只需在 API 调用时加上 stream=True(Python)或 stream: true(JavaScript)参数即可。SDK 会自动处理底层的 SSE 连接和事件解析,你只需要遍历事件流并根据事件类型做出响应。
以下是 Python 和 JavaScript 两种语言的基本实现。注意 Python 中使用 for event in stream 遍历,而 JavaScript 中使用 for await...of 异步遍历。
python
from openai import OpenAI
client = OpenAI()
stream = client.responses.create(
model="gpt-4.1",
input="写一首关于春天的短诗",
stream=True
)
for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
elif event.type == "response.completed":
print("\n\n--- 生成完成 ---")javascript
import OpenAI from "openai";
const client = new OpenAI();
const stream = await client.responses.create({
model: "gpt-4.1",
input: "写一首关于春天的短诗",
stream: true,
});
for await (const event of stream) {
if (event.type === "response.output_text.delta") {
process.stdout.write(event.delta);
} else if (event.type === "response.completed") {
console.log("\n\n--- 生成完成 ---");
}
}python
from openai import AsyncOpenAI
import asyncio
client = AsyncOpenAI()
async def main():
stream = await client.responses.create(
model="gpt-4.1",
input="写一首关于春天的短诗",
stream=True
)
async for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
elif event.type == "response.completed":
print("\n\n--- 生成完成 ---")
# 获取 token 使用情况
usage = event.response.usage
print(f"输入 token: {usage.input_tokens}")
print(f"输出 token: {usage.output_tokens}")
asyncio.run(main())在上面的代码中,最关键的是对 response.output_text.delta 事件的处理。每次收到这个事件时,event.delta 包含一小段新生成的文本(通常是几个字符到一个词),我们用 print(..., end="", flush=True) 或 process.stdout.write() 立即输出,不换行、不缓冲,实现逐字显示效果。
response.completed 事件在所有内容生成完毕后触发一次,它携带完整的 response 对象,包括 token 使用统计等元信息。这是执行收尾工作的理想时机。
核心事件类型
在流式传输中,你会遇到多种事件类型。了解每种事件的触发时机和携带数据,才能正确实现各种功能。以下是最常用的事件类型及其用途:
| 事件类型 | 触发时机 | 常用字段 | 用途 |
|---|---|---|---|
| response.created | 响应创建时 | response | 初始化 UI |
| response.output_text.delta | 每个文本片段 | delta | 实时显示文本 |
| response.output_text.done | 文本完成 | text | 获取完整文本 |
| response.function_call_arguments.delta | 函数参数片段 | delta | 流式函数调用 |
| response.completed | 响应完成 | response | 清理和收尾 |
| error | 出错时 | message | 错误处理 |
在实际开发中,你并不需要处理所有事件类型。对于最简单的文本输出场景,只需要处理 response.output_text.delta 和 response.completed 两个事件就够了。但如果你的应用涉及 Function Calling,就需要额外关注 response.function_call_arguments.delta 和相关事件。
流式 Function Calling
当模型在流式模式下决定调用函数时,事件流会稍有不同。模型不会一次性返回完整的函数名和参数 JSON,而是逐步推送函数调用的参数片段。你需要将这些片段拼接起来,在函数调用完成事件触发后解析完整的 JSON 并执行函数。
这种场景在构建 Agent 类应用时非常常见——模型可能需要先调用搜索 API 获取信息,再基于结果生成回答,整个过程都以流式方式进行。
python
from openai import OpenAI
import json
client = OpenAI()
tools = [{
"type": "function",
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"}
},
"required": ["city"]
}
}]
stream = client.responses.create(
model="gpt-4.1",
input="北京今天天气怎么样?",
tools=tools,
stream=True
)
function_args = ""
function_name = ""
for event in stream:
if event.type == "response.output_text.delta":
# 模型的文字回复
print(event.delta, end="", flush=True)
elif event.type == "response.function_call_arguments.delta":
# 逐步接收函数参数片段
function_args += event.delta
elif event.type == "response.function_call_arguments.done":
# 函数参数接收完毕,执行函数
args = json.loads(function_args)
print(f"\n[调用函数] get_weather({args})")
# 这里执行实际的函数调用
weather_result = {"city": args["city"], "temp": "22°C", "condition": "晴"}
# 将函数结果传回模型,继续流式生成
followup_stream = client.responses.create(
model="gpt-4.1",
input=[
{"role": "user", "content": "北京今天天气怎么样?"},
{"role": "function", "name": "get_weather", "content": json.dumps(weather_result, ensure_ascii=False)}
],
stream=True
)
for followup_event in followup_stream:
if followup_event.type == "response.output_text.delta":
print(followup_event.delta, end="", flush=True)
elif event.type == "response.completed":
print("\n\n--- 完成 ---")javascript
import OpenAI from "openai";
const client = new OpenAI();
const tools = [{
type: "function",
name: "get_weather",
description: "获取指定城市的天气信息",
parameters: {
type: "object",
properties: {
city: { type: "string", description: "城市名称" }
},
required: ["city"]
}
}];
const stream = await client.responses.create({
model: "gpt-4.1",
input: "北京今天天气怎么样?",
tools,
stream: true,
});
let functionArgs = "";
for await (const event of stream) {
if (event.type === "response.output_text.delta") {
process.stdout.write(event.delta);
} else if (event.type === "response.function_call_arguments.delta") {
functionArgs += event.delta;
} else if (event.type === "response.function_call_arguments.done") {
const args = JSON.parse(functionArgs);
console.log(`\n[调用函数] get_weather(${JSON.stringify(args)})`);
// 执行函数并将结果传回
const weatherResult = { city: args.city, temp: "22°C", condition: "晴" };
const followupStream = await client.responses.create({
model: "gpt-4.1",
input: [
{ role: "user", content: "北京今天天气怎么样?" },
{ role: "function", name: "get_weather", content: JSON.stringify(weatherResult) }
],
stream: true,
});
for await (const followupEvent of followupStream) {
if (followupEvent.type === "response.output_text.delta") {
process.stdout.write(followupEvent.delta);
}
}
} else if (event.type === "response.completed") {
console.log("\n\n--- 完成 ---");
}
}在上面的示例中,函数调用的流程分为三个阶段:首先通过 response.function_call_arguments.delta 事件逐步收集参数 JSON 片段;然后在 response.function_call_arguments.done 事件触发时,解析完整的 JSON 参数并执行实际的函数调用;最后将函数返回结果作为新的输入传回模型,让模型基于函数结果继续生成文本回答。
在 Web 应用中使用 Streaming
在前端 Web 应用中使用流式传输时,通常需要在后端创建一个代理端点,接收 OpenAI 的 SSE 流并转发给前端。这样可以避免在前端暴露 API Key。
以下是一个典型的前后端配合示例:
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
@app.post("/api/chat")
async def chat(prompt: str):
def generate():
stream = client.responses.create(
model="gpt-4.1",
input=prompt,
stream=True
)
for event in stream:
if event.type == "response.output_text.delta":
yield f"data: {event.delta}\n\n"
elif event.type == "response.completed":
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")javascript
async function streamChat(prompt) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
const outputEl = document.getElementById("output");
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") {
console.log("生成完成");
return;
}
outputEl.textContent += data;
}
}
}
}后端使用 FastAPI 的 StreamingResponse 将 OpenAI 的事件流转换为标准的 SSE 格式转发给前端。前端通过 ReadableStream 逐步读取数据并更新 DOM。这种架构简单可靠,是目前 AI 聊天应用最主流的实现方式。
最佳实践
💡Streaming 开发技巧
性能优化:
- 使用异步客户端:在 Python 中优先使用
AsyncOpenAI,配合async for遍历事件流。异步模式不会阻塞事件循环,特别适合 Web 服务器等并发场景。 - 合理的 UI 更新频率:在前端逐字显示时,不要每收到一个 delta 事件就触发 DOM 更新或 React 重渲染。可以使用
requestAnimationFrame或 debounce 将多个 delta 合并后再更新 UI,避免性能问题。 - 设置合理的超时:流式连接可能因网络问题而长时间无响应。建议设置连接超时(如 30 秒)和空闲超时(如 10 秒无新事件),超时后重试或提示用户。
代码组织:
- 封装事件处理器:不要把所有逻辑写在一个大循环里。将不同事件类型的处理逻辑封装成独立的函数或类方法,代码会更清晰、更易维护。
- 累积完整文本:虽然流式输出的文本是分片发送的,但建议在客户端维护一个完整的文本变量,每次 delta 都追加进去。这样在需要完整文本时(如保存到数据库)不必等待
response.output_text.done事件。 - 善用
response.completed事件:这个事件携带完整的响应对象,包括 token 用量统计。用它来记录日志、计算费用、更新配额。
错误处理
⚠️流式传输中的错误处理
流式传输中的错误处理比同步模式复杂,因为错误可能发生在连接的任何阶段:
- 连接阶段错误:API Key 无效、模型不存在、请求参数错误等。这些错误在连接建立前就会抛出,可以用常规的
try/catch捕获。 - 传输中断:网络不稳定、服务器端错误、token 超限等都可能导致流在中途断开。此时客户端会收到一个
error事件或连接直接关闭。 - 超时无响应:服务器可能因负载过高而长时间不推送新事件。
推荐的错误处理策略:
python
from openai import OpenAI, APIError, APIConnectionError, RateLimitError
client = OpenAI()
try:
stream = client.responses.create(
model="gpt-4.1",
input="你好",
stream=True
)
for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
elif event.type == "error":
print(f"\n[服务端错误] {event.message}")
break
except APIConnectionError:
print("网络连接失败,请检查网络后重试")
except RateLimitError:
print("请求频率超限,请稍后重试")
except APIError as e:
print(f"API 错误: {e.status_code} - {e.message}")关键原则:
- 始终用
try/except(或try/catch)包裹整个流式处理逻辑。 - 对
error类型的事件单独处理,它携带服务端推送的错误信息。 - 实现指数退避重试:第一次失败等 1 秒,第二次等 2 秒,第三次等 4 秒,以此类推。避免在高负载时雪崩式重试。
- 在前端应用中,及时向用户展示错误状态,而不是让界面无限"加载中"。提供"重试"按钮让用户主动重新发起请求。
学习检查
0% 完成
理解 Streaming 的核心价值:降低感知延迟、提升用户体验
掌握 SSE 事件流的完整生命周期和各事件触发顺序
能用 Python 和 JavaScript 实现基本的流式文本输出
了解流式 Function Calling 的参数拼接和多轮交互流程
能在 Web 应用中实现后端代理转发 SSE 流给前端
掌握流式传输中的错误处理和重试策略