vercel-ai 部分源码阅读,看看如何实现流式请求高并发

4 分钟
AI源码阅读

背景

在开发 AI 工具的时候,自己使用 fetch 调用 AI 的 api ,然后发现在并发的场景下会出现问题,经过询问得知社区的开源项目: https://github.com/vercel/ai ,直接能够解决我的问题,使用下来确实是这样,于是打算研究一下它是如何实现的

目的

找到源码中 streamText 的实现,然后找到它是如何支持并发请求的

streamText 功能解析

这个 API 的官方文档: https://sdk.vercel.ai/docs/reference/ai-sdk-core/stream-text

它接受的输入:

  • model:使用的语言模型
  • system:将作为提示部分内容的系统消息。
  • tools: 可供模型访问和调用的工具集。模型需支持工具调用功能。
  •  prompt:简单文本提示。prompt 和 messages 参数不可同时使用。
  • messages: 消息列表。prompt 和 messages 参数不可同时使用。
  • 还有很多。。。 它返回的输出:
  • textStream:一个仅返回生成文本增量的文本流。您可将其作为 AsyncIterable 或 ReadableStream 使用。当发生错误时,该流会抛出错误。
  • 还有很多。。。 除了上面的参数还有很多参数,但是我们的目的是搞清楚它发送请求的逻辑,所以就不一一列举了!

源码阅读

这里我的目标是找到如何发送请求,处理数据,然后如何返回内容的

1)安装源码

git clone git@github.com:vercel/ai.git

安装好后,我使用 Cursor 打开

2)找到源码地址

我知道我是从 ai 里面导出的 streamText ,所以它肯定在 ai 目录下,于是我找到了,它在这里:vercel/ai/packages/ai/core/generate-text/stream-text.ts 。但是居然高达 1600 行,我还是去玩游戏吧!(那是不可能的) 可以看到这里返回的是一个 DefaultStreamTextResult 的实例,这里代码逻辑过于复杂,借助 AI,我找到了相关代码部分: 这里可以看到它调用了 模型自己的 doStream 方法来获取结果,streamText 是负责准备请求数据,然后调用 model.doStream() => 处理返回的数据

所以我们去看模型自己如何写 doStream 方法,这里我看的是 openai ,我们去:packages/openai/src/openai-chat-language-model.ts ,这里我们可以看到: 这里调用 @ai-sdk/provider-utils 的 postJsonToApi 方法,我们继续去packages/provider-utils/src/post-to-api.ts 然后可以看到 postJsonToApi 返回 PostToApi ,所以我们看这个方法:

这就是我们要找的方法,我们详细看一下: 它接收:

  • url:目标API的URL
  • headers:请求头信息
  • body:请求体(包含content和values)
  • successfulResponseHandler:成功响应处理器
  • failedResponseHandler:失败响应处理器
  • abortSignal:中止信号
  • fetch = getOriginalFetch(): fetch实现(默认为全局fetch) 它返回:将 response 传给 successfulResponseHandler。也就是说它返回的是 successfulResponseHandler 函数执行的返回 然后: 1)发送 http 请求 2)提取、处理响应头 3)错误处理,这里通过多步处理然后抛出错误 4)成功处理,也通过 try catch 保证成功处理过程中错误能抛出 5)网络错误处理,处理 fetch 失败,并且设置可重试

代码解析:

  • 异步非阻塞:使用 async/await 实现异步操作,不会阻塞事件循环
  • 事件驱动:使用 promise ,当网络请求完成通过事件回调处理响应
  • 错误处理链:精细的错误处理机制,保证失败也能获取到精确的错误信息,或者重试

successfulResponseHandler

我们去 openai 看到传入了createEventSourceResponseHandler 函数给successfulResponseHandler ,createEventSourceResponseHandler 在 packages/provider-utils/src/response-handler.ts ,代码如下:

createEventSourceResponseHandler 是一个工厂函数,用于创建处理服务器发送事件(Server-Sent Events, SSE)格式响应的处理器,这里使用 ReadableStream.PipeThrough() 方法来进行数据处理

  • TextDecoderStram:将二进制转为文本
  • EventSourceParseStram:解析 SSE 格式
  • TransformStram:处理每个事件数据
  • 通过 enqueue() 方法将给定数据块送入到关联的流中 这种流式处理方式使得函数可以高效处理大量数据,而不必等待所有数据接收完毕。

函数优点:

  • 非阻塞处理:使用流式处理,不会阻塞事件循环
  • 增量处理:可以立即处理每个到达的数据块,无需等待完整响应
  • 内存效率:避免将整个响应加载到内存中
  • 类型安全:通过Zod模式确保每个数据块符合预期格式
  • 错误隔离:单个数据块解析错误不会影响整个流程

stream-text 流程图

flowchart TD
    A[用户调用 streamText] --> B[初始化参数和设置]
    B --> C[创建流处理管道]
    C --> D[执行 streamStep 函数]
    D --> E[调用 model.doStream 请求API]
    E --> F[处理流式响应]
    F --> G{是否需要后续步骤?}
    G -->|是| H[更新消息历史]
    H --> D
    G -->|否| I[关闭流并返回结果]

初始化:

flowchart TD
    A[用户调用 streamText] --> B[验证参数]
    B --> C[初始化承诺对象]
    C --> D[创建可拼接流]
    D --> E[设置流转换器]
    E --> F[准备遥测和重试设置]
    F --> G[标准化提示信息]

API 请求处理:

flowchart TD
    A[streamStep 函数] --> B[准备提示信息]
    B --> C[准备工具选择]
    C --> D[应用模型设置]
    D --> E[使用retry函数包装请求]
    E --> F["model.doStream 发起API请求"]
    
    subgraph "实际API调用"
    F --> G[provider包中的doStream实现]
    G --> H[调用 postToApi/postJsonToApi]
    H --> I[发起实际HTTP请求]
    end
    
    I --> J[返回流式响应]

流处理管道:

flowchart TD
    A[获取API响应流] --> B[应用工具转换]
    B --> C[创建事件处理器]
    
    subgraph "转换流"
    C --> D[处理文本增量]
    C --> E[处理推理内容]
    C --> F[处理工具调用]
    C --> G[处理步骤完成]
    end
    
    H[合并所有转换流] --> I[输出到最终结果]

多步处理与工具调用:

flowchart TD
    A[接收步骤结果] --> B{检查结束原因}
    B -->|工具调用| C[执行工具]
    B -->|长度限制| D[继续生成]
    B -->|完成| E[结束流]
    
    C --> F[处理工具结果]
    F --> G[准备下一步输入]
    G --> H[执行下一步]
    
    D --> I[继续当前消息]
    I --> J[执行下一步]

数据输出处理:

flowchart TD
    A[流处理完成] --> B[解析最终结果]
    B --> C[分离不同类型数据]
    
    subgraph "输出流转换"
    C --> D[textStream]
    C --> E[fullStream]
    C --> F[dataStream]
    end
    
    G[用户选择输出格式] --> H[返回相应格式数据]

数据流动过程 用户输入 → 格式化为模型可以理解的提示 API 请求 → 使用 postToApi 函数发起 HTTP 请求 流式返回 → 通过 TransformStream 处理返回的数据块 转换处理 → 根据类型(文本、工具调用、推理等)处理每个数据块 结果整合 → 将所有处理后的数据整合为最终结果 多步处理机制 代码支持多步骤处理,通过以下方式: 处理工具调用结果并根据需要发起新的 API 请求 当模型输出被截断时(finishReason 为 "length")可以继续生成 维护消息历史,将前面步骤的输出作为后续步骤的输入的一部分 这种设计使 SDK 能够支持复杂的对话和工具使用场景,实现更智能的交互体验。


此文自动发布于:github issues