Skip to content

Streaming

在与大语言模型交互时,等待整个响应生成完毕再一次性返回(同步模式)会带来明显的延迟感——用户可能需要等待数秒甚至更长时间才能看到第一个字。Streaming(流式传输) 解决的正是这个问题:模型一边生成内容,一边将结果以事件流(Server-Sent Events, SSE)的形式逐步推送给客户端。

流式传输带来的核心价值有三个。第一是感知延迟大幅降低:用户在几百毫秒内就能看到第一个字开始输出,而不是干等几秒。第二是更好的用户体验:类似 ChatGPT 的逐字打字效果,让用户感觉 AI 正在"思考并回答",互动感更强。第三是支持实时处理:你可以在模型还在生成的过程中就开始处理已接收到的内容,比如实时翻译、实时搜索联想等场景。

本章将详细介绍 Streaming 的工作原理、核心事件类型、Python 和 JavaScript 的实现方式,以及在 Function Calling 等复杂场景下的流式处理技巧。


SSE 事件流程

流式传输基于 Server-Sent Events (SSE) 协议。当你开启 stream=True 后,API 不再返回一个完整的 JSON 响应,而是通过一个持久的 HTTP 连接逐步推送一系列事件。每个事件都有一个 type 字段标识其类型,客户端根据事件类型做出相应处理。

下面是一次典型的流式响应中,事件的触发顺序:

SSE 流式传输事件流程
response.created
响应对象创建
response.in_progress
开始生成
response.output_text.delta
逐字输出文本片段
response.output_text.done
文本输出完成
response.completed
整个响应完成

在这个流程中,response.output_text.delta 事件会被触发多次——每生成几个字(或一个 token)就推送一次。这是你在客户端实现"逐字打字效果"的核心事件。而 response.completed 事件标志着整个生成过程结束,此时你可以执行清理操作,比如更新 UI 状态、记录日志或统计 token 用量。


基础实现

开启流式传输非常简单,只需在 API 调用时加上 stream=True(Python)或 stream: true(JavaScript)参数即可。SDK 会自动处理底层的 SSE 连接和事件解析,你只需要遍历事件流并根据事件类型做出响应。

以下是 Python 和 JavaScript 两种语言的基本实现。注意 Python 中使用 for event in stream 遍历,而 JavaScript 中使用 for await...of 异步遍历。

python
from openai import OpenAI

client = OpenAI()

stream = client.responses.create(
    model="gpt-4.1",
    input="写一首关于春天的短诗",
    stream=True
)

for event in stream:
    if event.type == "response.output_text.delta":
        print(event.delta, end="", flush=True)
    elif event.type == "response.completed":
        print("\n\n--- 生成完成 ---")
javascript
import OpenAI from "openai";
const client = new OpenAI();

const stream = await client.responses.create({
    model: "gpt-4.1",
    input: "写一首关于春天的短诗",
    stream: true,
});

for await (const event of stream) {
    if (event.type === "response.output_text.delta") {
        process.stdout.write(event.delta);
    } else if (event.type === "response.completed") {
        console.log("\n\n--- 生成完成 ---");
    }
}
python
from openai import AsyncOpenAI
import asyncio

client = AsyncOpenAI()

async def main():
    stream = await client.responses.create(
        model="gpt-4.1",
        input="写一首关于春天的短诗",
        stream=True
    )

    async for event in stream:
        if event.type == "response.output_text.delta":
            print(event.delta, end="", flush=True)
        elif event.type == "response.completed":
            print("\n\n--- 生成完成 ---")
            # 获取 token 使用情况
            usage = event.response.usage
            print(f"输入 token: {usage.input_tokens}")
            print(f"输出 token: {usage.output_tokens}")

asyncio.run(main())

在上面的代码中,最关键的是对 response.output_text.delta 事件的处理。每次收到这个事件时,event.delta 包含一小段新生成的文本(通常是几个字符到一个词),我们用 print(..., end="", flush=True)process.stdout.write() 立即输出,不换行、不缓冲,实现逐字显示效果。

response.completed 事件在所有内容生成完毕后触发一次,它携带完整的 response 对象,包括 token 使用统计等元信息。这是执行收尾工作的理想时机。


核心事件类型

在流式传输中,你会遇到多种事件类型。了解每种事件的触发时机和携带数据,才能正确实现各种功能。以下是最常用的事件类型及其用途:

事件类型 触发时机 常用字段 用途
response.created响应创建时response初始化 UI
response.output_text.delta每个文本片段delta实时显示文本
response.output_text.done文本完成text获取完整文本
response.function_call_arguments.delta函数参数片段delta流式函数调用
response.completed响应完成response清理和收尾
error出错时message错误处理

在实际开发中,你并不需要处理所有事件类型。对于最简单的文本输出场景,只需要处理 response.output_text.deltaresponse.completed 两个事件就够了。但如果你的应用涉及 Function Calling,就需要额外关注 response.function_call_arguments.delta 和相关事件。


流式 Function Calling

当模型在流式模式下决定调用函数时,事件流会稍有不同。模型不会一次性返回完整的函数名和参数 JSON,而是逐步推送函数调用的参数片段。你需要将这些片段拼接起来,在函数调用完成事件触发后解析完整的 JSON 并执行函数。

这种场景在构建 Agent 类应用时非常常见——模型可能需要先调用搜索 API 获取信息,再基于结果生成回答,整个过程都以流式方式进行。

python
from openai import OpenAI
import json

client = OpenAI()

tools = [{
    "type": "function",
    "name": "get_weather",
    "description": "获取指定城市的天气信息",
    "parameters": {
        "type": "object",
        "properties": {
            "city": {"type": "string", "description": "城市名称"}
        },
        "required": ["city"]
    }
}]

stream = client.responses.create(
    model="gpt-4.1",
    input="北京今天天气怎么样?",
    tools=tools,
    stream=True
)

function_args = ""
function_name = ""

for event in stream:
    if event.type == "response.output_text.delta":
        # 模型的文字回复
        print(event.delta, end="", flush=True)

    elif event.type == "response.function_call_arguments.delta":
        # 逐步接收函数参数片段
        function_args += event.delta

    elif event.type == "response.function_call_arguments.done":
        # 函数参数接收完毕,执行函数
        args = json.loads(function_args)
        print(f"\n[调用函数] get_weather({args})")

        # 这里执行实际的函数调用
        weather_result = {"city": args["city"], "temp": "22°C", "condition": "晴"}

        # 将函数结果传回模型,继续流式生成
        followup_stream = client.responses.create(
            model="gpt-4.1",
            input=[
                {"role": "user", "content": "北京今天天气怎么样?"},
                {"role": "function", "name": "get_weather", "content": json.dumps(weather_result, ensure_ascii=False)}
            ],
            stream=True
        )

        for followup_event in followup_stream:
            if followup_event.type == "response.output_text.delta":
                print(followup_event.delta, end="", flush=True)

    elif event.type == "response.completed":
        print("\n\n--- 完成 ---")
javascript
import OpenAI from "openai";
const client = new OpenAI();

const tools = [{
    type: "function",
    name: "get_weather",
    description: "获取指定城市的天气信息",
    parameters: {
        type: "object",
        properties: {
            city: { type: "string", description: "城市名称" }
        },
        required: ["city"]
    }
}];

const stream = await client.responses.create({
    model: "gpt-4.1",
    input: "北京今天天气怎么样?",
    tools,
    stream: true,
});

let functionArgs = "";

for await (const event of stream) {
    if (event.type === "response.output_text.delta") {
        process.stdout.write(event.delta);

    } else if (event.type === "response.function_call_arguments.delta") {
        functionArgs += event.delta;

    } else if (event.type === "response.function_call_arguments.done") {
        const args = JSON.parse(functionArgs);
        console.log(`\n[调用函数] get_weather(${JSON.stringify(args)})`);

        // 执行函数并将结果传回
        const weatherResult = { city: args.city, temp: "22°C", condition: "晴" };

        const followupStream = await client.responses.create({
            model: "gpt-4.1",
            input: [
                { role: "user", content: "北京今天天气怎么样?" },
                { role: "function", name: "get_weather", content: JSON.stringify(weatherResult) }
            ],
            stream: true,
        });

        for await (const followupEvent of followupStream) {
            if (followupEvent.type === "response.output_text.delta") {
                process.stdout.write(followupEvent.delta);
            }
        }

    } else if (event.type === "response.completed") {
        console.log("\n\n--- 完成 ---");
    }
}

在上面的示例中,函数调用的流程分为三个阶段:首先通过 response.function_call_arguments.delta 事件逐步收集参数 JSON 片段;然后在 response.function_call_arguments.done 事件触发时,解析完整的 JSON 参数并执行实际的函数调用;最后将函数返回结果作为新的输入传回模型,让模型基于函数结果继续生成文本回答。


在 Web 应用中使用 Streaming

在前端 Web 应用中使用流式传输时,通常需要在后端创建一个代理端点,接收 OpenAI 的 SSE 流并转发给前端。这样可以避免在前端暴露 API Key。

以下是一个典型的前后端配合示例:

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI()

@app.post("/api/chat")
async def chat(prompt: str):
    def generate():
        stream = client.responses.create(
            model="gpt-4.1",
            input=prompt,
            stream=True
        )
        for event in stream:
            if event.type == "response.output_text.delta":
                yield f"data: {event.delta}\n\n"
            elif event.type == "response.completed":
                yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")
javascript
async function streamChat(prompt) {
    const response = await fetch("/api/chat", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ prompt }),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    const outputEl = document.getElementById("output");

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split("\n");

        for (const line of lines) {
            if (line.startsWith("data: ")) {
                const data = line.slice(6);
                if (data === "[DONE]") {
                    console.log("生成完成");
                    return;
                }
                outputEl.textContent += data;
            }
        }
    }
}

后端使用 FastAPI 的 StreamingResponse 将 OpenAI 的事件流转换为标准的 SSE 格式转发给前端。前端通过 ReadableStream 逐步读取数据并更新 DOM。这种架构简单可靠,是目前 AI 聊天应用最主流的实现方式。


最佳实践

💡Streaming 开发技巧

性能优化:

  • 使用异步客户端:在 Python 中优先使用 AsyncOpenAI,配合 async for 遍历事件流。异步模式不会阻塞事件循环,特别适合 Web 服务器等并发场景。
  • 合理的 UI 更新频率:在前端逐字显示时,不要每收到一个 delta 事件就触发 DOM 更新或 React 重渲染。可以使用 requestAnimationFrame 或 debounce 将多个 delta 合并后再更新 UI,避免性能问题。
  • 设置合理的超时:流式连接可能因网络问题而长时间无响应。建议设置连接超时(如 30 秒)和空闲超时(如 10 秒无新事件),超时后重试或提示用户。

代码组织:

  • 封装事件处理器:不要把所有逻辑写在一个大循环里。将不同事件类型的处理逻辑封装成独立的函数或类方法,代码会更清晰、更易维护。
  • 累积完整文本:虽然流式输出的文本是分片发送的,但建议在客户端维护一个完整的文本变量,每次 delta 都追加进去。这样在需要完整文本时(如保存到数据库)不必等待 response.output_text.done 事件。
  • 善用 response.completed 事件:这个事件携带完整的响应对象,包括 token 用量统计。用它来记录日志、计算费用、更新配额。

错误处理

⚠️流式传输中的错误处理

流式传输中的错误处理比同步模式复杂,因为错误可能发生在连接的任何阶段:

  • 连接阶段错误:API Key 无效、模型不存在、请求参数错误等。这些错误在连接建立前就会抛出,可以用常规的 try/catch 捕获。
  • 传输中断:网络不稳定、服务器端错误、token 超限等都可能导致流在中途断开。此时客户端会收到一个 error 事件或连接直接关闭。
  • 超时无响应:服务器可能因负载过高而长时间不推送新事件。

推荐的错误处理策略:

python
from openai import OpenAI, APIError, APIConnectionError, RateLimitError

client = OpenAI()

try:
    stream = client.responses.create(
        model="gpt-4.1",
        input="你好",
        stream=True
    )
    for event in stream:
        if event.type == "response.output_text.delta":
            print(event.delta, end="", flush=True)
        elif event.type == "error":
            print(f"\n[服务端错误] {event.message}")
            break
except APIConnectionError:
    print("网络连接失败,请检查网络后重试")
except RateLimitError:
    print("请求频率超限,请稍后重试")
except APIError as e:
    print(f"API 错误: {e.status_code} - {e.message}")

关键原则:

  • 始终用 try/except(或 try/catch)包裹整个流式处理逻辑。
  • error 类型的事件单独处理,它携带服务端推送的错误信息。
  • 实现指数退避重试:第一次失败等 1 秒,第二次等 2 秒,第三次等 4 秒,以此类推。避免在高负载时雪崩式重试。
  • 在前端应用中,及时向用户展示错误状态,而不是让界面无限"加载中"。提供"重试"按钮让用户主动重新发起请求。

学习检查

0% 完成