写在前面:

本文章是关于《LLM Engineer’s Handbook》的部分学习笔记。原书以开发一个名为 LLM Twin 的、模仿某人的写作风格的端到端项目,展示如何运用 LLM 工程和 MLOps ,构建一个最小可行产品以解决特定问题。其中,作者使用较多笔墨来介绍 LLM 在工程化中的开发方法论,对于原理的探讨着墨较少。所以在以原书为主线的前提下会添加部分对于原理的介绍,作为对于方法论的补充说明。碍于本人学识有限,部分叙述难免存在纰漏,请读者注意甄别。感谢所有学者、工程师的开源帮助。

参考资料:


零、前言

大型语言模型(Large Language Model, LLM)工程领域迅速崛起,已成为人工智能和机器学习中的关键领域。随着 LLM 持续革新自然语言处理和生成技术,能够在实际场景中有效实施、优化和部署这些模型的专业人士需求呈指数级增长。LLM 工程涵盖从数据准备模型微调推理优化生产部署等广泛学科,要求软件工程、机器学习专业知识和领域知识的独特融合。

机器学习运维(Machine Learning Operations, MLOps)在成功实施 LLM 于生产环境中起着至关重要的作用。MLOps 将 DevOps 的原则扩展至机器学习项目,专注于自动化和简化整个 ML 生命周期。对于 LLM 而言,由于其模型的复杂性和规模,MLOps 尤为重要。它解决了诸如管理大型数据集、处理模型版本控制、确保可重复性以及维持模型性能等挑战。通过融入 MLOps 实践,LLM 项目可以实现更高的效率、可靠性和可扩展性,最终促成更成功和有影响力的部署。

《LLM Engineer’s Handbook》是一本全面指南(以下简称“手册”),旨在将最佳实践应用于 LLM 工程领域。手册中涵盖数据工程、监督微调、模型评估、推理优化和检索增强生成(Retrieval-Augmented Generation, RAG)管道开发等主题。

为直观展示这些概念,手册中将开发一个名为 LLM Twin 的端到端项目,目标是模仿某人的写作风格和个性。该用例将展示如何运用 LLM 工程和 MLOps ,构建一个最小可行产品以解决特定问题。我们读者将了解:

  • 如何为 LLM 收集和准备数据、针对特定任务微调模型、优化推理性能以及实施RAG管道;
  • 如何评估LLM性能、使模型与人类偏好对齐;
  • 部署基于LLM的应用程序。

本笔记与手册结构保持一致,主要分为以下几个内容:

  • 第1章 理解 LLM Twin 的概念和架构:介绍了贯穿全手册的 LLM Twin 项目,作为生产级 LLM 应用的端到端示例。定义了构建可扩展机器学习系统的 FTI 架构,并将其应用于 LLM Twin 的用例;
  • 第2章 工具和安装:介绍用于构建实际 LLM 应用的 Python、MLOps 和云工具,如编排器、实验跟踪器、提示监控和 LLM 评估工具。展示了如何在本地安装和使用这些工具以进行测试和开发;
  • 第3章 数据工程:介绍一个数据收集管道的实现,该管道从 MediumGitHubSubstack 等多个网站抓取数据,并将原始数据存储在数据仓库中。强调了在实际机器学习应用中,从动态来源收集原始数据的重要性,而非依赖静态数据集;
  • 第4章 RAG 特征管道:介绍了检索增强生成(Retrieval-Augmented Generation, RAG)的基本概念,如 Embeddings、基础 RAG 框架、向量数据库,以及如何优化 RAG 应用。通过使用软件最佳实践,设计并实现了 LLM Twin 的 RAG 特征管道,以应用 RAG 理论;
  • 第5章 监督微调(Supervised Fine-Tuning, SFT):探讨了使用**指令(instruction)-回答(answer)**对来优化预训练语言模型以执行特定任务的过程。涵盖了创建高质量数据集、实施全量微调(full fine-tuning)、LoRA 和 QLoRA 等微调技术,并提供了在自定义数据集上微调 Llama 3.1 8B 模型的实践示范;
  • 第6章 带偏好对齐的微调:介绍了将语言模型与人类偏好对齐的技术,重点关注直接偏好优化(Direct Preference Optimization, DPO)。涵盖了创建自定义偏好数据集、实施 DPO,并提供了使用 Unsloth 库对 TwinLlama-3.1-8B 模型进行对齐的实践示范;
  • 第7章 评估LLM:详细描述了评估语言模型和 LLM 系统性能的各种方法。介绍通用和特定领域的评估,讨论流行的基准测试。本章包括对 TwinLlama-3.1-8B 模型的多标准实践评估;
  • 第8章 推理优化:涵盖了关键的优化策略,如推测解码、模型并行和权重量化。将讨论如何提高推理速度、降低延迟和最小化内存使用,介绍流行的推理引擎并比较其特性;
  • 第9章 RAG推理管道:通过从头开始实施自查询、重排序和过滤向量搜索等方法,探索高级 RAG 技术。涵盖了设计和实现 LLM Twin 的 RAG 推理管道,以及类似于 LangChain 等流行框架的自定义检索模块;
  • 第10章 推理管道部署:介绍了在线、异步和批量推理等机器学习部署策略,有助于将微调后的 LLM Twin 模型架构并部署到AWS SageMaker,并构建 FastAPI 微服务,将 RAG 推理管道作为 RESTful API 公开;
  • 第11章 MLOps和LLMOps:介绍了 LLMOps 的概念,从其在 DevOps 和 MLOps 中的根源开始。本章解释了如何将 LLM Twin 项目部署到云端,如将机器学习管道部署到 AWS,并展示了如何使用 Docker 容器化代码和构建 CI/CD/CT 管道。还在 LLM Twin 的推理管道上添加了提示监控层;
  • 附录 MLOps原则:涵盖了用于构建可扩展、可重复和健壮的机器学习应用的六项 MLOps 原则。

一、理解 LLM Twin 的概念和架构

手册将教我们如何构建一个 LLM Twin,即一个通过将特定个人的写作风格、语气和个性融入 LLM 的 AI 角色。通过这个示例,我们将经历完整的 ML 生命周期,从数据收集到部署和监控。在实现 LLM Twin 的过程中学到的大多数概念都可以应用于其他基于 LLM 或 ML 的应用程序。

The best way to learn about LLMs and production machine learning (ML) is to get your hands dirty and build systems.

学习LLM和生产级机器学习(ML)的最佳方式是亲自动手构建系统。

在开始实施新产品时,从工程的角度来看,我们必须在开始构建之前经过三个规划步骤:

  • 首先,了解我们试图解决的问题以及我们想要构建的内容至关重要。在我们的案例中,LLM Twin 究竟是什么?为什么要构建它?这一步是我们必须思考并专注于**“WHY”**的地方。
  • 其次,为了反映现实世界的场景,我们将设计一个具有最小功能的产品的初级版本。在这里,我们必须清楚地定义产品所具有的核心功能。这些选择是基于时间表、资源和团队知识做出的。这是我们在构思阶段和实际实施之间架起桥梁,并最终回答以下问题:“我们要构建什么(WHAT)?”。
  • 最后,我们将进行系统设计步骤,列出用于构建 LLM 系统的核心架构和设计选型。前两个步主要与产品的设计相关,而最后一个是技术性的,专注于“HOW”。

这三个步骤是在构建现实世界产品时自然发生的。虽然前两个不需要太多的 ML 知识,但是这对于产品的开发而言同样至关重要。简而言之,本章涵盖以下主题:

  • 理解 LLM Twin 概念
  • 规划 LLM Twin 产品的最小可行产品(Minimum Viable Product, MVP)
  • 使用特征/训练/推理管道构建 ML 系统
  • 设计 LLM Twin 的系统架构

什么是 LLM Twin

LLM Twin 是一个将个性化的写作风格、语气融入大型语言模型(LLM)中的 AI 角色。与在整个互联网数据上训练的通用 LLM 不同,LLM Twin 是在个人数据上进行微调的,将这些个人数据“投射(projected)”到大语言模型中。

[!NOTE]

这里有意使用了“投射(projected)”一词。正如其他投射一样,在此过程中丢失大量信息,大模型只能反映出训练数据中信息

如果我们用鲁迅的文学作品微调 LLM,LLM 将会模仿鲁迅的写作风格,这也被称为风格迁移。我们将利用风格迁移策略,使 LLM 模仿我们个人的风格。

为了将 LLM 调整为特定的风格和语气,除了微调外,我们还将利用各种高级的检索增强生成(RAG)技术,以使用我们先前的 Embedding 来调节自回归过程。我们将在后续章节中详细探讨这些内容。

[!TIP]

什么是嵌入 Embedding ?

在 LLM 的开发领域中,向量 Embedding 在获取文本信息的本质方面起着关键作用。向量 Embedding 的核心是指在数学空间中将单词、句子甚至整个文档表示为密集的低维向量的过程。与依赖于稀疏表示(如one-hot编码)的传统方法不同,向量 Embeddings封装了单词之间的语义关系,并使算法能够理解它们的上下文含义。

通过使用词 Embeddings、句子 Embeddings 或上下文 Embedding 等技术,向量 Embeddings 提供了文本数据的紧凑而有意义的表示。例如,单词 Embeddings 将单词映射到固定长度的向量,其中具有相似含义的单词在向量空间中的位置更接近。这允许高效的语义搜索、信息检索和语言理解任务。

向量 Embedding 的重要性在于它能够将原始文本转换为算法可以理解和推理的数字表示。这种转换过程不仅促进了各种自然语言处理(NLP)任务,而且还作为大型语言模型的基本构建块。向量 Embeddings 使这些模型能够利用嵌入在文本数据中的丰富语义信息,使它们能够生成更连贯和上下文更合适的响应。

我们可以想象这样一个可以对 LLM 进行微调的场景:

  • 小红书、知乎等社交平台:使 LLM 仿照我们自己的风格来编写社交媒体内容;
  • 学术论文和文章:微调 LLM 以撰写正式和学术性的内容;
  • 代码:微调 LLM 使其以特定的代码规范来编写代码。

所有上述场景都可以归结为一个核心策略:收集个人数据集(或其中的一部分),使用不同的算法将其输入到 LLM 中。最终,LLM 将反映所收集数据的语气和风格。

为什么不用 Qwen 这些通用大模型?

Qwen 这些通用大模型非常通用、缺乏独特表达,且往往冗长。盲目使用通用大模型可能会导致以下问题:

  • 幻觉导致的错误信息:需要手动检查生成内容是否存在幻觉,或使用第三方工具进行验证,是一项繁琐且低效的任务。
  • 繁琐的手动提示工程:需要手动编写提示词并注入外部信息,这个过程既耗时又麻烦。此外,由于无法完全控制提示词和输入数据,在不同会话中生成一致的答案也十分困难。虽然可以通过 API 和 LangChain 等工具部分解决此问题,但这需要一定的编程经验。

如果想要高质量且真正有价值的内容时,我们可能会花比直接写作更多的时间去调试 AI 生成的文本。

由此可见,构建私人的大语言模型的关键点在于:

  • 我们收集哪些数据
  • 如何预处理这些数据
  • 如何将数据输入 LLM
  • 如何链接多个提示以获得理想结果
  • 如何评估生成的内容

规划产品的 MVP

既然我们已经了解了什么是 LLM Twin 以及为什么要构建它,那么我们需要明确定义产品的功能。手册中重点关注 LLM Twin 的第一版,即最小可行产品(Minimum Viable Product, MVP),以遵循大多数产品的自然发展周期。

什么是 MVP?

MVP 指的是产品的最小可行版本,即仅包含足够功能来吸引早期用户,并在开发的初始阶段验证产品概念的可行性。通常,MVP 的目标是以最小的投入从市场中收集反馈

MVP 是一种产品策略,主要有以下优势:

  • 加速产品上市(Accelerated time-to-market):快速推出产品,以获得早期用户并建立市场影响力。
  • 验证产品理念(Idea validation):在全面开发产品之前,通过真实用户进行测试,以验证产品是否符合需求。
  • 市场调研(Market research):深入了解目标用户的偏好,收集有价值的市场反馈。
  • 降低风险(Risk minimization):减少因产品市场表现不佳而浪费的时间和资源。

在 MVP 中,必须严格遵循 “V”(Viable,可行性) 的原则,即产品必须是可行的。即使产品功能最小化,它也必须提供完整的用户体验,而不是半成品。MVP 需要是一个真正可用的产品,提供流畅的使用体验,让用户愿意持续使用,并随着产品的发展而发展。

LLM Twin 的 MVP 的核心功能

为了保持简单性,我们的 LLM Twin MVP 将具备以下核心功能:

  1. 数据收集
    • 小红书、知乎、微信 和 GitHub 账户收集用户的数据。
  2. LLM 训练微调
    • 使用开源 LLM,结合收集的数据进行微调(fine-tuning)。
  3. RAG(检索增强生成)
    • 将收集的数字数据存入向量数据库(vector database),以支持 RAG 机制。
  4. 社交媒体内容生成(例如小红书文章)
    • 用户输入的提示(prompts)
    • RAG 检索,复用并引用用户过往内容
    • 新内容(如文章、论文等)作为 LLM 额外的知识输入
  5. 简单的 Web 界面,提供交互能力:
    • 配置社交媒体链接,并触发数据收集流程
    • 输入提示词(prompts)或外部资源链接,让 LLM Twin 生成内容

MVP 的关键挑战

尽管上述 MVP 可能看起来功能不多,但我们必须确保系统具备以下特性

  • 成本可控(Cost-effective):优化计算资源,避免不必要的开销。
  • 可扩展(Scalable):随着用户增长,系统仍能稳定运行。
  • 模块化(Modular):方便未来扩展和优化。

至此,我们已经从用户和商业角度探讨了LLM Twin 的价值。最后一步,我们需要从工程实现的角度进行分析,并制定开发计划,明确如何在技术层面实现这个系统

[!NOTE]

从现在开始,重点将转向 LLM Twin 的具体实现。即使我们专注于上述核心功能,我们仍会基于最新的 LLM 研究成果,并结合最佳的软件工程与 MLOps 实践,构建一个成本可控、可扩展的 LLM 应用

构建具有特征/训练/推理流水线的 ML 系统

在深入探讨 LLM Twin 架构的具体细节之前,我们需要先理解其核心 ML 体系结构模式——特征/训练/推理(Feature/Training/Inference , FTI)架构。本节将概述 FTI 流水线 的设计,以及它如何帮助我们构建一个结构化的 ML 应用。

ML 系统开发的挑战

构建生产级 ML 系统不仅仅是训练一个模型。从工程角度来看,训练模型通常是最简单的一步。然而,决定正确的架构和超参数,才是让模型真正发挥作用的挑战——这更像是一个研究问题,而不是纯粹的工程问题。

当前,我们关注的重点是如何设计一个可用于生产的架构即使训练出了高准确率的模型,仅仅基于静态数据集训练它,距离真正的部署仍然很遥远。 我们需要考虑以下问题:

  • 数据处理:如何**摄取(ingest)、清理(clean)和验证(validate)**新的数据?
  • 训练 vs 推理环境:训练和推理(Inference)环境是否需要分开部署计算资源 如何分配?
  • 特征存储与计算:如何在正确的环境计算并提供模型所需的特征?
  • 模型部署与服务:如何高效、低成本地提供推理服务?如何版本化、追踪并共享数据集和模型?
  • 监控与维护:如何监控 ML 基础设施和模型的表现?模型如何扩展并持续更新
  • 自动化:如何自动化模型的部署和训练流程?

[!TIP]

这些问题通常由 ML 或 MLOps 工程师 负责解决,而 研究团队或数据科学团队 主要关注模型训练本身。

Google Cloud 团队提出的 成熟 ML & MLOps 系统 需要包括的组件如下:

screenshot_2025-03-10_15.43.53

  • ML 代码(核心模型开发)
  • 数据收集(Data Collection)
  • 数据验证(Data Verification)
  • 测试与调试(Testing & Debugging)
  • 资源管理(Resource Management)
  • 模型分析(Model Analysis)
  • 流程 & 元数据管理(Process & Metadata Management)
  • 服务基础设施(Serving Infrastructure)
  • 监控系统(Monitoring)

可见,生产化 ML 模型远远不只是写好训练代码这么简单,它涉及多个环节和工程实践。

如何构建一个统一的 ML 系统?

关键问题:如何将所有这些组件连接成一个统一的 ML 系统?我们需要设计一个标准化的架构,使 ML 系统的搭建更加高效、可复用和可扩展。

在传统软件工程中,很多应用可以拆分为 数据库(DB)、业务逻辑(Business Logic)和用户界面(UI) 三大部分。尽管每个部分的实现可能非常复杂,但在高层次的架构设计上,它们仍然可以归纳为这三大模块。

那么,ML 应用是否也能有类似的通用架构呢?

我们需要先回顾一些现有方案,看看它们为什么不适合构建可扩展的 ML 系统,然后再探索更优的解决方案。

以往解决方案的问题

screenshot_2025-03-10_15.47.41

在上图中,我们可以看到大多数 ML 应用程序中常见的架构。这种架构基于单体批处理(monolithic batch)模式,将特征创建(Create Features)、**模型训练(Train Model)推理(Make Predictions)**紧密耦合在同一个组件中。

采用这种方法可以快速解决 ML 领域中的一个关键问题——训练-推理偏差(training-serving skew)

  • 训练-推理偏差 发生在训练时和推理时使用的特征计算方式不同,导致模型在生产环境中的表现不如预期。
  • 在这种单体架构中,训练和推理阶段的特征是用相同的代码生成的,因此避免了训练-推理偏差

单体批处理架构适用于小数据集,因为:

  • 训练、推理使用相同的特征计算代码,避免了训练-推理偏差
  • 通过**批处理(batch mode)**定期运行流水线
  • 预测结果通常被**第三方应用(如 dashboard)**消费

然而,这种架构在面对更大规模的数据时,会引发许多问题

  • 特征无法复用(既不能在系统内部复用,也不能被其他系统使用)
  • 扩展性差,如果数据规模增加,必须重构代码以支持 PySparkRay
  • 性能优化困难,如果想用 C++、Java 或 Rust 重写推理模块,会变得极为复杂
  • 团队协作受限,由于特征计算、训练和推理紧耦合在一起,难以拆分给不同的团队
  • 不支持流式计算,如果需要实时训练,无法切换到流式架构
单体架构在实时推理系统中的问题

screenshot_2025-03-10_15.57.35

在上图中,我们可以看到类似的架构被应用于实时推理系统时会带来的额外问题。

在实时推理中,为了生成预测,我们必须通过客户端请求传输整个状态,以便计算特征并输入模型。例如,在电影推荐系统中,理想情况下,我们只需传递 userID 给模型,模型可以基于存储的用户数据计算推荐结果。但在单体架构中,我们必须传递整个用户状态,包括姓名、年龄、性别、观影历史等,使得客户端必须理解如何访问这些状态数据。

这种方法极易出错,因为:

  • 客户端和模型服务 强耦合,客户端必须知道如何查询和构造数据
  • 状态传输成本高,尤其在高并发情况下,传输大量状态信息会影响性能

另一个例子是 LLM + RAG(检索增强生成) 的实现:

  • RAG 模型 中,我们希望能基于外部知识库增强 LLM 的推理能力。
  • 如果没有向量数据库(vector DB),我们必须在每次查询时手动附带所有文档,否则模型无法参考这些外部知识。
  • 这样就导致客户端需要手动查询和管理文档,这不仅不现实,而且是一种反模式(antipattern)

客户端不应负责查询和计算特征,而应交由服务端处理。

我们将在第 8 章和第 9 章详细介绍 RAG 这一技术。

综上所述,我们的核心问题是如何在不依赖客户端传递完整特征的情况下进行预测

另一端的极端案例,Google Cloud提供了一种**生产就绪(production-ready)**的、自动化流水线的 ML 架构(见下图)。

screenshot_2025-03-10_16.10.48

这种架构确实能够解决生产环境中的 ML 部署问题,但它存在以下挑战:

  • 复杂度高,不够直观,非 ML 生产专家很难理解
  • 上手难度大,如果你没有丰富的 ML 生产部署经验,可能会被架构的复杂性劝退
  • 不易渐进式扩展,难以理解如何从小型系统开始并随着需求增长逐步扩展

在接下来的章节,我们将介绍 特征/训练/推理(Feature/Training/Inference , FTI)架构,它是一种直观的 ML 设计,能够有效解决前述的核心问题

特征/训练/推理(FTI) 架构

[!TIP]

想了解更多关于 FTI 模式的信息,可以参考*“From MLOps to ML Systems with Feature/Training/Inference Pipelines”* by Jim Dowling, CEO and co-founder of Hopsworks:https://www.hopsworks.ai/post/mlops-to-ml-systems-with-fti-pipelines

特征/训练/推理(Feature/Training/Inference , FTI)架构提出了一个清晰直接的思维框架,任何团队或个人都可以遵循它,来完成特征计算、模型训练以及推理管道的部署。该模式表明,任何机器学习系统都可以归结为三个管道:

  • 计算特征*(Feature)*
  • 训练模型*(Training)*
  • 进行推理*(Inference)*

这种架构强大之处在于,我们可以清晰地定义每个管道的职责和接口。最终,系统只有三个核心模块,而不是像 Google Cloud 方案中展示的那种拥有二十个模块的复杂结构,这大大简化了操作和定义的难度。下图展示了特征、训练和推理管道架构。

screenshot_2025-03-10_16.11.00

FTI 架构的核心特点

  • 每个管道都是独立的组件,可以在不同进程或硬件上运行
  • 每个管道可以使用不同的技术实现,甚至可以由不同的团队开发和维护
  • 可扩展性强,允许团队根据实际需求对不同管道独立扩展
  • 提供清晰的思维导图,帮助团队高效组织 ML 系统架构

特征管道(Feature Pipeline)

作用
特征管道的主要任务是从原始数据中提取特征,并生成用于模型训练或推理的特征和标签。但这些特征不会直接传递给模型,而是**存储在特征库(Feature Store)**中。

主要职责

  • 存储、版本管理、追踪、共享 训练和推理所需的特征。
  • 保持特征的状态,确保训练和推理阶段使用的特征一致,从而避免训练-推理偏差(Training-Serving Skew)
  • 让训练和推理管道轻松获取数据,保证系统的稳定性和可复现性

训练管道(Training Pipeline)

作用
训练管道的任务是**从特征库中提取特征和标签,训练模型,并将训练好的模型存储在模型仓库(Model Registry)**中。

主要职责

  • 训练一个或多个模型,并存储、版本管理、追踪和共享 这些模型。

  • 模型仓库(Model Registry) 的角色类似于特征库,但重点是管理模型,而不是特征。

  • 记录元数据(Metadata Store)

    ,包括:

    • 训练使用的特征、标签及其版本,确保模型的可追溯性。
    • 确保团队可以随时知道模型的训练数据,方便调试和迭代。

推理管道(Inference Pipeline)

作用
推理管道的任务是使用特征库中的特征数据和模型仓库中的训练模型进行推理,并生成最终的预测结果。

主要职责

  • 支持批量(Batch)或实时(Real-time)推理:
    • 批量模式:预测结果存入数据库(DB)。
    • 实时模式:预测结果直接返回给客户端。
  • 版本管理:特征、标签、模型的版本都是可追踪的,这意味着可以灵活地升级或回滚模型部署
  • 动态调整模型与特征的连接关系:
    • 例如,M1模型M_{1} 可能使用f1特征f_1f2特征f_2f3特征f_3,而 M2模型M_{2} 可能使用 f2特征f_2f3特征f_3f4特征f_4
    • 通过版本管理,我们可以快速切换或调整特征与模型的映射关系

设计 LLM Twin 的系统结构

需求分析

系统需要具备以下数据处理能力:

  • 数据采集:自动化并定期小红书、知乎 和 GitHub(如果可行) 抓取数据。
  • 数据存储与标准化:统一格式化爬取的数据,并存入数据仓库(Data Warehouse)
  • 数据清理:处理 噪声数据、重复数据和异常数据,确保数据质量。
  • 指令数据集(Instruction Dataset)构建:生成 用于微调 LLM 的训练数据集
  • 数据向量化与存储切分(Chunking)和嵌入(Embedding) 清理后的数据。存储向量化数据到向量数据库(Vector DB),以支持 RAG

训练(Training)需求

  • 支持多种 LLM 微调

    • 支持 不同规模的 LLM(7B、14B、30B、70B 参数)
    • 能够基于 不同规模的指令数据集 进行微调。
    • 支持不同 LLM 模型类型(如 Mistral、Llama、GPT 之间切换)。
  • 实验管理跟踪和比较 训练实验结果,优化模型效果。

  • 自动化训练自动启动 训练任务,当新的 指令数据集可用 时触发训练流程。在部署前 测试潜在的 生产 LLM 候选模型,确保高质量推理能力。

推理(Inference)需求

  • REST API 接口:提供 REST API,允许客户端与 LLM Twin 交互。

  • 实时访问向量数据库(Vector DB)支持 RAG,确保推理时可以实时检索相关知识数据。

  • 多模型推理能力支持不同规模的 LLM 进行推理,适应不同业务场景。

  • 自动扩展(Auto-Scaling)根据用户请求负载自动扩展推理服务,优化计算资源分配。

  • 自动化部署通过评估机制,自动部署 通过测试的 LLM 版本,减少手动干预。

LLMOps 需求

  • 指令数据集管理支持版本控制数据 lineage 追踪数据集复用,提高数据可管理性。

  • 模型管理支持模型版本控制模型 lineage 追踪模型复用,便于模型管理和回溯。

  • 实验追踪记录所有实验配置、结果和性能指标,确保可重复性和优化。

  • CI/CD + 持续训练(Continuous Training)支持 CT/CI/CD,即持续训练(CT)、持续集成(CI)和持续部署(CD)。

  • 提示词和系统监控:监控**提示词(Prompt)**的表现,防止偏差。系统监控,确保 LLM 服务稳定运行。

如何使用 FTI 管道设计 LLM Twin 架构

我们将系统拆分为四个核心组件。除了 FTI 的三大核心管道(特征、训练、推理)外,我们还必须实现数据管道

但在我们的场景下,我们的目标是在小团队中构建一个 MVP(最小可行产品),因此:我们必须同时实现数据收集和 FTI 管道。这种 端到端开发模式 在初创公司中非常常见,因为资源有限,无法分配独立团队。工程师需要 跨多个角色,视项目进度调整工作内容。即使未来团队扩展,理解端到端 ML 系统的架构仍然至关重要,有助于协同开发与优化。

screenshot_2025-03-11_10.13.07

数据收集管道(Data Collection Pipeline)

数据收集管道的任务是爬取你的个人数据,包括:小红书知乎(帖子、文章),GitHub(代码)

在架构上,该管道遵循ETL(提取-加载-转换)模式,即:

  • 提取(Extract):从社交媒体平台爬取数据;
  • 转换(Transform):对数据进行标准化处理;
  • 加载(Load):将数据存入数据仓库(NoSQL 数据库)

[!NOTE]

为什么使用 NoSQL 作为数据仓库?

由于我们处理的是文本数据,它天然是非结构化的,因此 NoSQL 数据库(如 MongoDB)是最佳选择。尽管 MongoDB 不是传统的关系型数据库,但在我们的架构中,它将充当数据库的角色,因为:

  • 它存储了标准化的原始数据,这些数据由 ETL 管道收集并可以直接用于 ML 训练
  • 它适合灵活存储和查询非结构化文本数据,便于下游管道访问和处理。

为了更好地处理数据,我们将爬取的数据分为三类

  • 文章(Articles) → 知乎
  • 帖子(Posts) → 小红书
  • 代码(Code) → GitHub

我们希望抽象化数据来源,即:在 LLM 训练或推理时,数据的来源不重要;但为了溯源和引用,我们会将**原始 URL 作为元数据(metadata)**存储。

数据处理、微调训练(Fine-tuning)和 RAG(检索增强生成) 的角度来看,知道数据类别比知道来源更重要

  • 例如,不同数据类型的切分(chunking)策略会有所不同:
    • 帖子(Post)文章(Article) 的分割方式不同。
    • 代码(Code) 需要额外的解析和上下文理解。

类别(category)而非来源(source)组织数据,能提高系统的扩展性:

  • 例如: 小红书的数据可以直接纳入Posts 类别,无须改动处理逻辑;而GitLab 的代码数据可以无缝集成到 Code 类别

特征管道(Feature Pipeline)

特征管道的核心作用是从数据仓库获取原始数据(文章、帖子、代码),进行处理后存入特征存储(Feature Store)FTI 设计模式的核心特点在此体现,但 LLM Twin 的特征管道有一些自定义特性

  • 针对三种数据类型(文章、帖子、代码)分别进行不同的处理
  • 包含三大核心步骤(清洗、切分、嵌入),用于微调 LLM 和 RAG(检索增强生成)
  • 创建两种数据快照:
    • 清洗后数据(用于 LLM 微调)
    • 嵌入(Embedding)后数据(用于 RAG):使用逻辑特征存储(Logical Feature Store),而非传统的专用特征存储。

逻辑特征存储:向量数据库(Vector DB)

在 RAG 系统中,向量数据库(Vector DB)是关键基础设施。向量数据库本质上是 NoSQL 数据库,可以按 ID 和集合名称(collection name) 访问数据点(datapoints)。我们可以查询 Vector DB 中的新数据,而不需要执行向量搜索(Vector Search)。处理后的数据会被封装为版本化、可追踪、可共享的处理后的数据(artifact)(关于 artifact 的细节将在第 2 章讨论)。

[!TIP]

What is an artifact in computer science?

To put it simply, an artifact is a by-product of software development. It’s anything that is created so a piece of software can be developed. This might include things like data models, diagrams, setup scripts — the list goes on.

(简单来说,Artifact 指的是一种软件开发的副产品。它指的是任何创建出来用以开发一套软件的一类东西,这其中也许包含了数据模型,图表,启动脚本等等。)

系统的其余部分将如何访问逻辑特征存储?**训练管道(Training Pipeline)**将指令数据集(Instruction Datasets)视为 artifact。推理管道(Inference Pipeline) 通过 向量搜索(Vector Search) 查询 Vector DB 以获取额外上下文信息。

对于我们的用例 LLM Twin,这已经足够了,因为:指令数据集(artifact)非常适合用于离线训练,而向量数据库是为在线访问而构建的,这是我们进行推理所需要的。

不过,在后续章节中,我们将解释如何**清理(cleaned)、分块(chunked)和嵌入(embedded)**这三个数据类别(文章、帖子和代码)。

训练管道(Training Pipeline)

训练管道的核心职责是 从特征存储(Feature Store)获取指令数据集(Instruct Dataset),微调 LLM,并将训练好的 LLM 权重存入模型注册表(Model Registry)。更具体地说:

  1. 触发训练:当逻辑特征存储中有新的指令数据集(artifact)可用时,我们将触发训练管道,使用工件并微调 LLM。
  2. 超参数优化:
    • 在初始阶段,数据科学团队负责这一步。他们通过自动或手动进行多次实验,以找到最适合的模型和超参数。
    • 为了比较和挑选最佳超参数集,记录所有有价值的东西,并在实验之间进行比较。
    • 最终,他们将挑选最佳超参数和微调后的 LLM,并将其作为 LLM 生产候选方案提出。然后将提议的 LLM 存储在模型注册表中。实验阶段结束后,我们存储并重复使用找到的最佳超参数。
  3. 如今,我们可以完全自动化训练过程,即持续训练(Continuous Training, CT)。我们的模块化设计使我们能够快速利用 ML 编排器来安排和触发不同的系统部分。例如,我们可以安排数据收集管道每周抓取数据。然后,当数据仓库中有新数据可用时,我们可以触发特征管道,当有新的指令数据集可用时,我们可以触发训练管道。

在将新模型推向生产之前,根据更严格的测试集对其进行评估至关重要,以确保最新候选模型比当前生产模型更好。如果此步骤通过,模型最终将被标记为已接受并部署到生产推理管道。即使在完全自动化的 ML 系统中,也建议在接受新的生产模型之前进行手动步骤。因此,在此阶段,专家会查看测试组件生成的报告。如果一切看起来都很好,它就会批准该模型,自动化可以继续。

关键技术问题

  • 如何设计一个 LLM 无关的训练管道?
  • 应该使用哪些微调技术?
  • 如何扩展微调算法,使其适用于不同规模的 LLM 和数据集?
  • 如何从多个实验中选取最优 LLM 作为生产候选?
  • 如何测试 LLM,以决定是否推送到生产环境?

在后面的章节中,我们将一一介绍解决方案。

推理管道(Inference Pipeline)

推理管道是 LLM 系统的最后一个核心组件,负责处理用户查询并返回答案。它连接 模型注册表(Model Registry)逻辑特征存储(Logical Feature Store),用于加载微调后的 LLM 并执行 RAG(检索增强生成,Retrieval-Augmented Generation)。

推理流程

  1. 加载模型:从模型注册表加载已微调的 LLM;从逻辑特征存储访问向量数据库(Vector DB),用于 RAG 查询。
  2. 接收客户端请求:通过 REST API 接收用户查询;解析查询并生成 RAG 任务。
  3. 执行 RAG 以增强 LLM 生成能力:使用 向量数据库进行检索(Vector Search),找到相关外部信息;结合 LLM 进行答案生成,返回最终响应。
  4. 监控与分析:所有用户查询、RAG 处理的增强提示(Enriched Prompts)和生成结果,都会发送至提示监控系统(Prompt Monitoring System)。监控系统 分析、调试 模型输出,优化 LLM 行为。可根据 特定需求 触发警报,执行手动或自动调整。

FTI 设计与 LLM Twin 架构的最终思考

FTI(Feature-Training-Inference)模式 并不需要严格遵循,它的核心作用是帮助清晰地设计 ML(机器学习)系统。例如,我们的系统并没有使用传统的特征存储(Feature Store),而是选择了 基于向量数据库(Vector DB)和指令数据集(Artifacts)逻辑特征存储(Logical Feature Store),因为这样更简单且成本更低。重点是提供一个可版本化(Versioned)且可复用(Reusable) 的训练数据集,而不是形式上的标准化存储。

计算资源需求及可扩展性

  • 数据收集管道 & 特征管道
    • 主要依赖 CPU 计算,对计算资源需求较低。
    • 基于 CPU & RAM 负载 水平扩展(Horizontal Scaling)
  • 训练管道
    • 需要 强大的 GPU 计算能力 来加载和微调 LLM。
    • 通过 增加 GPU 资源 垂直扩展(Vertical Scaling)
  • 推理管道
    • 计算需求介于数据管道和训练管道之间,需要 较强计算能力 以确保低延迟。
    • 基于 客户端请求数量 水平扩展(Horizontal Scaling)

推理管道直接面向用户,因此必须严格测试,确保延迟符合预期,从而提供良好的用户体验。FTI 设计使得计算资源的分配变得灵活,我们可以为不同的组件选择最合适的计算架构。

引用

  1. Dowling, J. (2024a, July 11). From MLOps to ML Systems with Feature/Training/Inference Pipelines. Hopsworks. https://www.hopsworks.ai/post/mlops-to-ml-systems-with-fti-pipelines
  2. Dowling, J. (2024b, August 5). Modularity and Composability for AI Systems with AI Pipelines and Shared Storage. Hopsworks. https://www.hopsworks.ai/post/modularity-and-composability-for-ai-systems-with-ai-pipelines-and-shared-storage
  3. Joseph, M. (2024, August 23). The Taxonomy for Data Transformations in AI Systems. Hop- sworks. https://www.hopsworks.ai/post/a-taxonomy-for-data-transformations-in-ai-systems
  4. MLOps: Continuous delivery and automation pipelines in machine learning. (2024, August 28). Google Cloud. https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
  5. Qwak. (2024a, June 2). CI/CD for Machine Learning in 2024: Best Practices to build, test, and Deploy | Infer. Medium. https://medium.com/infer-qwak/ci-cd-for-machine-learning-in-2024-best-practices-to-build-test-and-deploy-c4ad869824d2
  6. Qwak. (2024b, July 23). 5 Best Open Source Tools to build End-to-End MLOPs Pipeline in 2024. Medium. https://medium.com/infer-qwak/building-an-end-to-end-mlops-pipeline-with-open-source-tools-d8bacbf4184f
  7. Salama, K., Kazmierczak, J., & Schut, D. (2021). Practitioners guide to MLOPs: A framework for continuous delivery and automation of machine learning (1st ed.) [PDF]. Google Cloud. https://services.google.com/fh/files/misc/practitioners_guide_to_mlops_whitepaper.pdf

二、工具链与安装

[!CAUTION]

在手册中的对应章节介绍了使用的所有核心工具,特别是 LLM Twin 项目 的实现和部署所需的工具。因为个人偏好不同、对于工具链的选择亦有不同,所以在这里不做详细介绍,只会简要介绍手册作者推荐的工具链。

如果你熟悉这些工具,可以直接跳过本章。

在本章,我们不会深入讲解 LLM、RAG、MLOps 或 LLMOps 的概念,而是快速概览我们的技术栈和前置要求,避免后续章节重复讲解工具安装和选择原因。从 第 3 章开始,我们将正式进入 LLM Twin 的应用场景,并实现一个 ETL 数据收集流程,用于从互联网爬取数据。

本章内容概览

  • Python 生态工具
    • 如何管理多个 Python 版本
    • 创建虚拟环境
    • 安装固定版本的依赖项
    • 如何在本地安装 LLM-Engineers-Handbook 代码库(如果你想尝试代码,地址如下):GitHub Repo
  • MLOps & LLMOps 工具链
    • 介绍通用 MLOps 工具(如模型注册表)
    • 深入了解LLM 相关工具(如 LLM 评估和 Prompt 监控工具)
    • 使用 ZenML 进行 ML 管道管理(ML 与 MLOps 的桥梁)
  • 数据库管理
    • 介绍 NoSQL 和向量数据库 的使用
    • 如何使用 Docker 在本地运行所有组件
  • 云端环境准备(AWS)
    • 创建 AWS 账户并获取访问密钥
    • 安装 & 配置 AWS CLI,以便程序化管理云资源
    • 了解 SageMaker 及其在开源 LLM 训练和部署中的作用

Python 生态工具链安装

任何 Python 项目都需要三个基本工具:python 解释器依赖项管理任务执行工具。Python 解释器会按预期执行您的 Python 项目。手册中的所有代码都使用 python 3.11.8 进行了测试。

1
2
python --version
# Python 3.11.8

[!TIP]

我们可以从此处下载 python 解释器python.org

手册中推荐使用 poetry 工具来管理 python的依赖和虚拟环境,poethepoetPoetry 插件,用于管理 CLI 任务,替代 Makefileshell 脚本等;当然也可以使用其他的工具的组合替代,比如 conda(mini-conda)pip

MLOps & LLMOps 工具概览

本节简要介绍 MLOps(机器学习运维)和 LLMOps(大模型运维)工具,以及它们在 LLM Twin 项目中的作用。理论部分将在第 11 章深入讲解,而手册主要通过实战来展示这些工具的使用方式。

Hugging Face:模型注册库(Model Registry)

模型注册库 是一个 集中式存储,用于管理 ML 模型版本、元数据和性能指标。它在 MLOps 中起到关键作用:

  • 版本控制(Versioning)
  • 模型共享(Sharing)
  • 模型可追溯性(Traceability)
  • 集成 CI/CD 流水线(Continuous Deployment)

以下是一些常见的模型库:

  • Hugging Face Model Hub
    • 社区丰富,但中国大陆境内无法直接访问 ❌
    • 适用于:开源社区、NLP、大模型(LLMs)
    • 官网https://huggingface.co
    • 特点
      1. 大模型(LLM)生态支持,可存储 Transformer、Diffusion 等 AI 模型。
      2. 提供 可视化 UI,可管理模型版本、推理 demo(Spaces)。
      3. 易于与 PyTorch、TensorFlow、JAX 等框架集成。
      4. 适用于 团队协作社区共享,支持 私有模型库
  • AWS SageMaker Model Registry
    • 适用于: 企业级云端 MLOps,企业、金融、医疗等对安全性要求高的 MLOps 场景。
    • 官网: aws.amazon.com/sagemaker/
    • 特点:
      1. AWS SageMaker 生态完美结合,支持 训练-注册-部署-监控 整套流程。
      2. 提供 自动化 CI/CD,模型更新后可自动部署。
      3. 高安全性,支持 IAM 权限管理,适用于企业级 ML 部署。
  • ModelScope
    • 阿里云推出的开源 AI 模型平台
    • 适用于:适用于 大模型(LLM)、计算机视觉(CV)、自然语言处理(NLP) 等 AI 任务
    • 官网https://modelscope.cn/
    • 特点
      1. 支持 1,000+ 预训练模型,包括大语言模型(LLM)、计算机视觉(CV)、语音处理(Speech)、多模态(Multimodal)等
      2. 一键调用 AI 工作流(Pipelines),快速搭建 AI 任务
      3. 免费在线体验,支持模型下载、本地部署、API 调用
对比项 魔搭 ModelScope Hugging Face Model Hub
适用地区 中国大陆境内可用 🚫 需科学上网
模型数量 ⭐ 1,000+ ⭐ 10,000+
大模型支持 ✅ 通义千问、ChatGLM、Qwen ✅ LLaMA、GPT-3、Falcon
微调(Fine-tuning) ✅ LoRA, QLoRA, P-Tuning ✅ LoRA, PEFT, DPO, Unsloth
推理服务(API) ✅ 免费调用 ✅ 需付费
工作流(Pipelines) ✅ 一键执行 🚫 需手写代码,可与 LLMOps 集成
私有化部署 ✅ 企业可自建 ✅ 需自建服务器

[!NOTE]

手册中使用的 Hugging Face 模型:

ZenML:MLOps 工作流编排器

[!NOTE]

TODO

CometML:可视化实验跟踪

[!NOTE]

TODO

Opik:评估、测试和监控大型语言模型

[!NOTE]

TODO

非结构化数据库与向量数据库

MongoDB:NoSQL

[!NOTE]

TODO

Qdrant:向量数据库

[!NOTE]

TODO

云端环境准备(AWS)

[!NOTE]

TODO

三、数据工程

本章将深入探讨 LLM Twin 项目,学习如何设计和实现数据收集流水线,以获取用于 LLM 任务(如微调或推理)的原始数据。由于本书并非专门介绍数据工程,因此本章内容将保持精简,仅关注收集必要原始数据的关键部分。从第 4 章开始,我们将重点讨论 LLM 和生成式 AI,深入研究其理论和具体实现细节。

在处理项目或研究时,我们通常会使用一个静态数据集。但在 LLM Twin 项目中,我们希望模拟真实世界的场景,在其中主动收集和整理数据。因此,构建数据流水线将帮助我们了解端到端机器学习项目的工作方式。本章将讲解如何设计和实现 ETL(提取、转换、加载)流水线,从 社交平台爬取数据,并将其存储到 MongoDB 数据库。我们将介绍各种爬取方法,标准化数据,并将其加载到数据仓库中。

本章的主要内容包括:

  1. 设计数据收集流水线
    • 介绍 LLM Twin 的数据收集架构
    • 解析 ETL 流水线的设计
  2. 实现数据收集流水线
    • 使用 ZenML 作为流程编排工具
    • 构建爬虫,并实现 调度层(根据 URL 域名实例化对应爬虫类)
    • 按最佳软件开发实践,开发每个爬虫模块
  3. 数据仓库管理
    • MongoDB 之上构建数据层,统一管理文档结构
    • 查询并交互数据

最后,我们将学习如何使用 ZenML 运行数据收集流水线,并查询 MongoDB 中的数据

设计数据收集管道

在深入实施之前,我们必须了解 LLM Twin 的数据收集 ETL 架构,如下图所示。我们需要探究从哪些平台抓取数据,以及如何设计数据结构和流程。但是,第一步是了解我们的数据收集管道如何映射到 ETL 流程。

screenshot_2025-03-11_15.25.00

ETL 管道涉及三个基本步骤:

  1. 我们从各种来源提取数据。我们将从内容平台抓取数据以收集原始数据。
  2. 我们通过清理和标准化这些数据,将其转换为适合存储和分析的一致格式。
  3. 我们将转换后的数据加载到数据仓库或数据库中。

对于我们的项目,我们使用 MongoDB 作为我们的 NoSQL 数据仓库。虽然这不是标准方法,但我们很快就会解释这种选择背后的原因。
我们想要设计一个 ETL 管道,输入一个用户和一个链接列表作为输入。之后,它会单独抓取每个链接,标准化收集到的内容,并将其保存在 MongoDB 数据仓库中该特定作者下。

数据收集流水线的输入和输出

  • 输入:用户(作者)及其提供的一组链接。
  • 输出:存储在 MongoDB 数据仓库中的原始文档列表。

[!NOTE]

我们将交替使用用户作者,因为在 ETL 管道的大多数情况下,用户是提取内容的作者。但是,在数据仓库中,我们只有一个用户集合。

screenshot_2025-03-11_15.38.12ETL 管道将检测每个链接的域名,并根据该域调用专门的爬虫。我们为三个不同的数据类别实现了四个不同的爬虫,如上图所示。首先,我们收集的所有文档都可以归结为文章、存储库(或代码)和帖子。数据来自哪里并不重要。我们主要对文档的格式感兴趣。在大多数情况下,我们必须以不同的方式处理这些数据类别。因此,我们为每个实体创建了一个不同的域实体,每个实体在 MongoDB 中都有自己的类和集合。当我们将源 URL 保存在文档的元数据中时,我们仍然会知道它的来源,并可以在 GenAI 用例中引用它。

爬虫类型 目标数据源 输出文档类型 主要步骤
Medium 爬虫 Medium 文章 文章(Article) 登录 Medium → 爬取 HTML → 解析并清理文本 → 存入数据库
通用文章爬虫 Substack / 个人博客等 文章(Article) 爬取 HTML → 解析并清理文本 → 存入数据库
GitHub 爬虫 GitHub 仓库 代码仓库(Repository) 克隆代码仓库 → 解析文件树 → 处理代码文件 → 存入数据库
LinkedIn 爬虫 LinkedIn 个人动态 帖子(Post) 登录 LinkedIn → 爬取用户动态 → 解析 HTML → 存入数据库

在下一节中,我们将详细研究每个爬虫的实现。现在,请注意,每个爬虫都以特定方式访问特定平台或站点并从中提取 HTML。之后,所有爬虫都会解析 HTML,从中提取文本,并对其进行清理和规范化,以便可以将其存储在同一个接口下的数据仓库中。

通过将所有收集的数据减少到三种数据类别,文章(Article)代码仓库(Repository)帖子(Post),而不是为每个新数据源创建新的数据类别,我们可以轻松地将此架构扩展到多个数据源,而无需付出太多重复适配工作。例如,如果我们想开始从 X 收集数据,我们只需要实现一个输出帖子文档的新爬虫,仅此而已。其余代码将保持不变。否则,如果我们在类和文档结构中引入源维度,我们将不得不向所有下游层添加代码以支持任何新数据源。例如,我们必须为每个新源实现一个新的文档类,并调整功能管道以支持它。

对于我们的概念验证,抓取几百个文档就足够了,但如果我们想将其扩展到实际产品,我们可能需要更多数据源来抓取。LLM 需要大量数据,通常需要数千个文档才能获得理想的结果,而不仅仅是几百个文档。但在许多项目中,实现一个不是最准确的端到端项目版本并在以后对其进行迭代是一种很好的策略。因此,通过使用这种架构,可以在未来的迭代中轻松添加更多数据源以收集更大的数据集。下一章将介绍有关 LLM 微调和数据集大小的更多信息。

ETL 过程如何连接到特征管道?特征管道从 MongoDB 数据仓库中提取原始数据,进一步清理,将其处理为特征,并将其存储在 Qdrant 向量数据库中,以使 LLM 训练和推理管道可以访问它。第 4 章提供了有关特征管道的更多信息。ETL 过程独立于特征管道。这两个管道严格通过 MongoDB 数据仓库相互通信。因此,数据收集管道可以为 MongoDB 写入数据,而功能管道可以独立地按照不同的时间表从中读取数据。

为什么我们使用 MongoDB 作为数据仓库?

  • 适用于小规模数据:本项目的文档量较小,MongoDB 能够很好地处理。
  • 适合非结构化文本:爬取的数据主要是非结构化文本,MongoDB 不强制模式(Schema),使开发更灵活。
  • 易用性:MongoDB 提供直观的 Python SDK,官方提供 Docker 镜像云端免费层,适合本项目的 PoC(概念验证)。
  • 未来可扩展性:如果数据量增大(如达到百万级别),可以切换到 SnowflakeBigQuery 这样的专用数据仓库。

实现 LLM Twin 的数据收集流水线

LLM Twin 项目的每个流水线的入口都是一个 ZenML 流水线,且可以通过 YAML 文件在运行时进行配置,并通过 ZenML 生态系统执行。因此,我们从 ZenMLdigital_data_etl 流水线开始,仔细分析其实现方式。你会注意到,这正是我们在第 2 章中用来演示 ZenML 的示例流水线。不过,这次我们将深入探讨其实现,并解释数据收集的具体细节。理解流水线的工作原理后,我们将探讨每个爬虫的实现,它们分别用于从不同网站收集数据并存储到 MongoDB 数据仓库中。

[!CAUTION]

这里手册的作者使用 ZenML 作为 LLMOps 的整体框架,使用 zenml.pipeline 作为流水线管道的具体实现。这里我们将重点放在流水线的工程方法,而非具体的实现中(框架无关)。如果读者需要了解 ZenML 框架内的相关实现,请在手册中第 65 页查找详情。

流水线与步骤

在流水线的实现中,它的输入是用户的全名和一组链接,这些链接将由该用户(即该链接的内容作者)进行爬取。在函数内,我们调用了两个步骤:首先,我们根据全名查找用户。接着,我们遍历所有链接并逐个爬取。

1
2
3
4
5
@pipeline
def digital_data_etl(user_full_name: str, links: list[str]) -> str:
user = get_or_create_user(user_full_name) # step1
last_step = crawl_links(user=user, links=links) # step2
return last_step.invocation_id

接下来,我们将分别探讨 get_or_create_usercrawl_links 这两个步骤:

  1. 以用户的全名作为输入,并尝试从 MongoDB 数据库中查找该用户,如果用户不存在,则创建一个新的用户;

    1
    2
    3
    4
    5
    6
    7
    8
    @step
    def get_or_create_user(user_full_name: str) -> Annotated[UserDocument, "user"]:
    logger.info(f"Getting or creating user: {user_full_name}")
    first_name, last_name = utils.split_user_full_name(user_full_name)
    user = UserDocument.get_or_create(first_name=first_name, last_name=last_name)
    step_context = get_step_context()
    step_context.add_output_metadata(output_name="user", metadata=_get_metadata(user_full_name, user)) # matadata
    return user

    在这里,我们还定义了一个辅助函数 _get_metadata(),它构建了一个包含查询参数和检索到的用户信息的字典,这些信息将作为元数据添加到用户输出 artifact :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def _get_metadata(user_full_name: str, user: UserDocument) -> dict:
    return {
    "query": {
    "user_full_name": user_full_name,
    },
    "retrieved": {
    "user_id": str(user.id),
    "first_name": user.first_name,
    "last_name": user.last_name,
    },
    }
  2. 接下来是 crawl_links 步骤,它用于收集提供的链接中的数据。在此函数中,我们初始化了一个爬虫分发器 CrawlerDispatcher,并配置它以处理特定的域名,如 GitHub 等:

    1
    2
    3
    4
    5
    @step
    def crawl_links(user: UserDocument, links: list[str]) -> Annotated[list[str], "crawled_links"]:
    dispatcher = CrawlerDispatcher.build().register_github()
    logger.info(f"Starting to crawl {len(links)} link(s).")

    然后,该函数初始化了存储输出元数据的变量,并计数成功的爬取次数。它遍历每个链接,尝试爬取并提取数据,并更新成功的爬取次数和链接的元数据:

    1
    2
    3
    4
    5
    6
    7
    metadata = {}
    successfull_crawls = 0
    for link in tqdm(links):
    successfull_crawl, crawled_domain = _crawl_link(dispatcher, link, user)
    successfull_crawls += successfull_crawl
    metadata = _add_to_metadata(metadata, crawled_domain, successfull_crawl)

    处理完所有链接后,函数将累积的元数据添加到输出 artifact 中:

    1
    2
    3
    4
    step_context = get_step_context()
    step_context.add_output_metadata(output_name="crawled_links", metadata=metadata)
    logger.info(f"Successfully crawled {successfull_crawls} / {len(links)} links.")
    return links

    在上述的函数中有两个辅助函数 _crawl_link_add_to_metadata_crawl_link 尝试使用合适的爬虫来提取每个链接的信息,处理任何可能发生的异常,并返回一个元组,表示爬取是否成功以及链接的域名:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def _crawl_link(dispatcher: CrawlerDispatcher, link: str, user: UserDocument) -> tuple[bool, str]:
    crawler = dispatcher.get_crawler(link)
    crawler_domain = urlparse(link).netloc
    try:
    crawler.extract(link=link, user=user)
    return (True, crawler_domain)
    except Exception as e:
    logger.error(f"An error occurred while crawling: {e!s}")
    return (False, crawler_domain)

    _add_to_metadata 用于更新元数据字典,记录每个域名的爬取成功与总数:

    1
    2
    3
    4
    5
    6
    def _add_to_metadata(metadata: dict, domain: str, successfull_crawl: bool) -> dict:
    if domain not in metadata:
    metadata[domain] = {}
    metadata[domain]["successful"] = metadata.get(domain, {}).get("successful", 0) + successfull_crawl
    metadata[domain]["total"] = metadata.get(domain, {}).get("total", 0) + 1
    return metadata

爬虫分发器 CrawlerDispatcher 的实现

正如上面提到的,CrawlerDispatcher 类会根据每个链接的域名来确定该使用哪个爬虫。

screenshot_2025-03-12_11.12.34

爬虫的提取逻辑被封装在 extract() 方法中。例如,如果提供的链接属于 https://github.com,它会创建一个 GithubCrawler 实例来爬取该平台的数据。接下来,我们深入探讨 CrawlerDispatcher 的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import BaseCrawler, GithubCrawler, CustomArticleCrawler

class CrawlerDispatcher:
def __init__(self) -> None:
# 存储域名(如 guthub.com)到 爬虫类(如 GithubCrawler)的映射关系。
self._crawlers = Dict[str, Type[BaseCrawler]] = {}

@classmethod
def build(cls) -> "CrawlerDispatcher":
dispatcher = cls()
return dispatcher

def register_github(self) -> "CrawlerDispatcher":
""" 注册一个新的爬虫类型 """
self.register("https://github.com", GithubCrawler)
return self

def register(self, domain:str, crawler:type[BaseCrawler]) -> None:
parsed_domain = urlparse(domain)
domain = parsed_domain.netloc
self._crawlers[r"https://(www\.?{}/*)".format(re.eacape(domain))] = crawler

def get_crawler(self, url:str) -> BaseCrawler:
""" 根据 URL 解析出域名,并实例化正确的爬虫 """
for pattern, crawler in self._crawlers.items():
if re.match(pattern, url):
return crawler()
else:
logger.warning(f"No crawler found for {url}. Defaulting to CustomArticleCrawler")
return CustomArticleCrawler()

爬虫(Crawlers)

在深入探讨各个爬虫的具体实现之前,我们需要先介绍它们的 基类 BaseCrawler

基类定义了一个统一的接口,使得所有爬虫都遵循相同的结构。我们之所以能够实现分发层(Dispatcher Layer),正是因为所有爬虫都遵循相同的方法签名。

基类 BaseCrawler

现在,我们来看 BaseCrawler 的实现,它定义了所有爬虫必须实现的方法:

1
2
3
4
5
6
7
8
9
10
from abc import ABC, abstractmethod

class BaseCrawler(ABC):
""" 所有爬虫的基类,定义了通用接口 """
model: type[NoSQLBaseDocument]

@abstractmethod
def extract(self, link: str, **kwargs):
""" 爬取数据并存入数据库 """
pass

解析

  1. BaseCrawler 继承 ABC(抽象基类),确保不能直接实例化它。
  2. extract() 是一个 抽象方法,所有具体爬虫都必须实现它。

这样,我们可以在不更改 CrawlerDispatcher 代码的情况下,轻松 扩展新爬虫

拓展爬虫:基于 Selenium 的爬虫基类

BaseCrawler 基础上,我们进一步扩展出了 BaseSeleniumCrawler,用于 自动化浏览器操作,以便爬取需要动态加载或需要登录的网站(如 Medium、LinkedIn)。

[!NOTE]

为什么使用 Selenium?

  1. 支持动态内容加载:许多现代网站使用 JavaScript 加载内容,普通的 HTTP 请求(如 requests)无法抓取完整数据,而 Selenium 可以模拟用户操作,触发 JavaScript 代码
  2. 支持登录:对于 需要用户认证 的网站(如 LinkedIn),Selenium 可以模拟用户输入账号密码并自动登录
  3. 支持交互:Selenium 可以执行点击、滚动、表单填写等操作,使其更适用于复杂网页的爬取。
  4. 使用 Selenium-based 的爬虫,必须先在本机安装 Chrome (或者其他基于 chromium 内核的浏览器)

简述

  • 代码使用 SeleniumChromeDriver 初始化程序设置 Web 爬取所需的导入和配置
  • chromedriver_autoinstaller 确保安装适当版本的 ChromeDriver 并将其添加到系统路径,从而与已安装的 Google Chrome 浏览器版本(或其他基于 Chromium 的浏览器)保持兼容性。
  • Selenium 将使用 ChromeDriver 与浏览器通信并打开无头会话模式,我们可以在其中以编程方式操作浏览器以访问各种 URL、单击特定元素(例如按钮)或滚动浏览新闻源。
  • 使用 chromedriver_autoinstaller,我们确保始终安装与我们机器的 Chrome 浏览器版本匹配的正确 ChromeDriver 版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
from tempfile import mkdtemp
import chromedriver_autoinstaller
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from llm_engineering.domain.documents import NoSQLBaseDocument
# Check if the current version of chromedriver exists
# and if it doesn't exist, download it automatically,
# then add chromedriver to path
chromedriver_autoinstaller.install()

class BaseSeleniumCrawler(BaseCrawler, ABC):
def __init__(self, scroll_limit:int=5) -> None:
chromedriver_autoinstaller.install() # 自动安装 ChromeDriver 并初始化

options = webdriver.ChromeOptions()
options.add_argument("--no-sandbox")
options.add_argument("--headless=new") # 无头模式,浏览器界面不可见
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--log-level=3")
options.add_argument("--disable-popup-blocking")
options.add_argument("--disable-notifications")
options.add_argument("--disable-extensions")
options.add_argument("--disable-background-networking")
options.add_argument("--ignore-certificate-errors")
options.add_argument(f"--user-data-dir={mkdtemp()}")
options.add_argument(f"--data-path={mkdtemp()}")
options.add_argument(f"--disk-cache-dir={mkdtemp()}")
options.add_argument("--remote-debugging-port=9226")

self.set_extra_driver_options(options) # 子类可以自定义额外的选项
self.scroll_limit = scroll_limit
self.driver = webdriver.Chrome(options=options)

def set_extra_driver_options(self, options: Options) -> None:
""" 子类可以重写此方法,添加额外的浏览器选项 """
pass

def login(self) -> None:
""" 子类可以重写此方法,实现登录逻辑 """
pass

def scroll_page(self) -> None:
""" 滚动页面,直到达到滚动限制 """
current_scroll = 0
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(5)
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height or (self.scroll_limit and current_scroll >= self.scroll_limit):
break
last_height = new_height
current_scroll += 1

至此,我们已经定义了两个爬虫基类:BaseCrawlerBaseSeleniumCrawler。下一步我们将通过继承这两个基类,从而实现具体的爬虫:

  • GithubCrawler(BaseCrawler)
  • CustomArticleCrawler(BaseCrawler)
  • MediumCrawler(BaseSeleniumCrawler)

Github 爬虫类 GithubCrawler

GitHubCrawler 类继承自 BaseCrawler,旨在爬取 GitHub 仓库的内容。与其他需要 Selenium 的爬虫不同,由于 GitHub 支持 Git 的 clone 功能,爬虫无需通过浏览器模拟登录或操作,而是直接利用 Git 将仓库克隆到本地进行内容提取。下面是这个类的详细实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class GithubCrawler(BaseCrawler):
model = RepositoryDocument # MongoDB 中定义的代码仓库类型的内容
def __init__(self, ignore=(".git", ".toml", ".lock", ".png")) -> None:
'''ignore 参数定义了要忽略的标准文件和目录类型。例如 .git, .toml, .lock, .png 等。这样可以确保爬虫在处理 GitHub 仓库时不会抓取这些不需要的文件。'''
super().__init__()
self._ignore = ignore # 文件过滤规则,忽略标准文件和目录

@override
def extract(self, link: str, **kwargs) -> None:
# 检查仓库是否已经在数据库中存在,避免重复存储
old_model = self.model.find(link=link)
if old_model is not None:
logger.info(f"Repository already exists in the database: {link}")
return

try:
logger.info(f"Starting scrapping GitHub repository: {link}")
repo_name = link.rstrip("/").split("/")[-1]
local_temp = tempfile.mkdtemp()
os.chdir(local_temp)

# 自进程执行克隆 GitHub 仓库
subprocess.run(["git", "clone", link])

# 获取克隆后的仓库路径
repo_path = os.path.join(local_temp, os.listdir(local_temp)[0]) # 获取仓库目录路径
tree = {} # 用于存储仓库文件内容的字典

# 遍历仓库目录,跳过忽略的文件和目录
for root, _, files in os.walk(repo_path):
dir = root.replace(repo_path, "").lstrip("/")
if dir.startswith(self._ignore):
continue

for file in files:
if file.endswith(self._ignore):
continue
file_path = os.path.join(dir, file)
# 读取文件内容,去除空格并存储
with open(os.path.join(root, file), "r", errors="ignore") as f:
tree[file_path] = f.read().replace(" ", "")

# 创建 RepositoryDocument 实例并保存到 MongoDB
user = kwargs["user"]
instance = self.model(
content=tree,
name=repo_name,
link=link,
platform="github",
author_id=user.id,
author_full_name=user.full_name,
)
instance.save()

except Exception:
raise # 若发生错误,抛出异常

finally:
# 确保删除临时目录以释放资源
shutil.rmtree(local_temp)
logger.info(f"Finished scraping GitHub repository: {link}")

CustomArticleCrawler

CustomArticleCrawler 类采用不同的方法来从互联网上收集数据。它利用 AsyncHtmlLoader 类来加载链接的完整 HTML 内容,再通过 Html2TextTransformer 类提取该 HTML 中的文本内容。这两个类由 langchain_communityPython 包提供,以下是相关模块的导入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from urllib.parse import urlparse
from langchain_community.document_loaders import AsyncHtmlLoader
from langchain_community.document_transformers.html2text import Html2TextTransformer
from loguru import logger
from domain.documents import ArticleDocument

import BaseCrawler
class CustomArticleCrawler(BaseCrawler):
model = ArticleDocument # MongoDB 中定义的文章类型的内容

def extract(self, link: str, **kwargs) -> None:
# 检查文章是否已经存在于数据库中,避免重复爬取
old_model = self.model.find(link=link)
if old_model is not None:
logger.info(f"Article already exists in the database: {link}")
return

# 如果文章不存在,进行爬取
logger.info(f"Starting scraping article: {link}")
loader = AsyncHtmlLoader([link]) # 使用 AsyncHtmlLoader 加载 HTML 内容
docs = loader.load() # 加载 HTML
html2text = Html2TextTransformer() # 使用 Html2TextTransformer 提取文本
docs_transformed = html2text.transform_documents(docs) # 转换为文本
doc_transformed = docs_transformed[0] # 获取第一个转换后的文档

# 提取文章内容和相关元数据
content = {
"Title": doc_transformed.metadata.get("title"),
"Subtitle": doc_transformed.metadata.get("description"),
"Content": doc_transformed.page_content,
"language": doc_transformed.metadata.get("language"),
}

# 解析 URL 来确定平台(或域名)
parsed_url = urlparse(link)
platform = parsed_url.netloc

# 创建文章模型实例,并保存到 MongoDB 数据库
user = kwargs["user"]
instance = self.model(
content=content,
link=link,
platform=platform,
author_id=user.id,
author_full_name=user.full_name,
)
instance.save()
logger.info(f"Finished scraping custom article: {link}")

解析

  1. 初始化方法:
    • CustomArticleCrawler 类继承自 BaseCrawler,并使用 ArticleDocument 作为数据模型来存储爬取的文章信息。
  2. extract() 方法:
    • 检查重复:首先检查文章是否已经存在于数据库中,如果存在,则不再重复爬取该文章。
    • 加载 HTML:如果文章不存在,使用 AsyncHtmlLoader 类加载提供的链接的 HTML 内容。
    • 提取文本:使用 Html2TextTransformer 类将 HTML 转换为纯文本,并返回一个包含文档内容的列表。我们只关心列表中的第一个文档。
    • 提取内容:从转换后的文档中提取标题、子标题、正文和语言等元数据。
    • 解析平台:通过解析 URL 来确定文章所属的平台或域名。
    • 保存数据:使用 ArticleDocument 类创建一个文章实例,并将提取的内容保存到 MongoDB 数据库中。
  3. LangChain 的应用
    • AsyncHtmlLoaderHtml2TextTransformer 类遵循 LangChain 的范式,这使得我们能够快速实现爬取和转换功能。虽然它们非常适用于大多数场景,但由于灵活性较差,难以进行深度定制,因此在生产环境中可能不适合所有情况,无法对某些特定网站或内容结构进行精细化控制。

NoSQL 数据仓库文档

我们之前设计了三个文档类来结构化我们的数据类别。这些类定义了文档所需的具体属性,例如内容、作者和来源链接。最佳实践是将数据结构化为类,而不是字典,因为我们对每个项目期望的属性更为详尽,从而减少运行时错误。例如,当从 Python 字典中访问值时,我们无法确保它存在或其类型是否正确。通过将数据项封装在类中,我们可以确保每个属性都符合预期。

通过利用像 Pydantic 这样的 Python 包,我们可以获得开箱即用的类型验证,确保数据集的一致性。因此,我们将数据类别建模为以下文档类,这些类已经在代码中使用过:

  • ArticleDocument
  • PostDocument
  • RepositoryDocument

这些不仅仅是简单的 Python 数据类或 Pydantic 模型。它们支持在 MongoDB 数据仓库上进行读写操作。为了将读写功能注入到所有文档类中,并避免重复代码,我们使用了对象-文档映射(ODM)软件模式,它基于对象关系映射(ORM)模式。因此,接下来我们将首先探讨 ORM,然后转向 ODM,最后深入研究我们的自定义 ODM 实现和文档类。

ODM 中间件

在讨论软件模式之前,让我们了解一下 ORM。ORM 是一种技术,允许您使用面向对象的范式查询和操作数据库中的数据。无需编写 SQL 或特定 API 的查询,您可以将所有复杂性封装到一个 ORM 类中,该类知道如何处理所有数据库操作,通常是 CRUD 操作。因此,使用 ORM 可以避免手动处理数据库操作,并减少手动编写样板代码的需要。ORM 与 SQL 数据库(如 PostgreSQL 或 MySQL)交互。

大多数现代 Python 应用程序在与数据库交互时都会使用 ORM。尽管 SQL 在数据领域仍然是一种流行的选择,但在 Python 后端组件中,您很少看到原始的 SQL 查询。最流行的 Python ORM 是 SQLAlchemy(https://www.sqlalchemy.org/)。此外,随着 FastAPI 的兴起,SQLModel(https://github.com/fastapi/sqlmodel)成为了一个常见选择,它是 SQLAlchemy 的一个封装,使其与 FastAPI 的集成更加简便。

ODM 模式与 ORM 非常相似,但它是针对 NoSQL 数据库(如 MongoDB)和无结构的集合操作,而不是 SQL 数据库和表。当我们使用 NoSQL 数据库时,数据结构主要集中在集合中,存储的是类似 JSON 的文档,而不是表中的行。

总之,ODM 简化了与基于文档的 NoSQL 数据库的交互,并将面向对象的代码映射到类似 JSON 的文档。接下来,我们将实现一个轻量级的 ODM 模块,基于 MongoDB,帮助我们深入理解 ODM 的工作原理。

接下来,我们定义了一个类型变量 T,它绑定到 NoSQLBaseDocument 类。该变量利用 Python 的泛型模块,允许我们将类的类型进行泛化。例如,在实现 ArticleDocument 类时,T 所有使用的地方都会被替换为 ArticleDocument 类型。

NoSQLBaseDocument 类继承了 Pydantic 的 BaseModel、Python 的 GenericABC 类(使其成为抽象基类),它是我们的基础 ODM 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import uuid
from abc import ABC
from typing import Generic, Type, TypeVar
from loguru import logger
from pydantic import UUID4, BaseModel, Field
from pymongo import errors
from llm_engineering.domain.exceptions import ImproperlyConfigured
from llm_engineering.infrastructure.db.mongo import connection
from llm_engineering.settings import settings
_database = connection.get_database(settings.DATABASE_NAME)

T = TypeVar("T", bound="NoSQLBaseDocument")

class NoSQLBaseDocument(BaseModel, Generic[T], ABC):
'''实现了 __eq__ 和 __hash__ 方法,允许比较实例并将其用于散列集合(如集合)或基于其唯一 id 属性作为字典键'''
id: UUID4 = Field(default_factory=uuid.uuid4)

def __eq__(self, value: object) -> bool:
if not isinstance(value, self.__class__):
return False
return self.id == value.id

def __hash__(self) -> int:
return hash(self.id)

@classmethod
'''将从 MongoDB 获取的字典转换为类实例'''
def from_mongo(cls: Type[T], data: dict) -> T:
if not data:
raise ValueError("Data is empty.")
id = data.pop("_id")
return cls(**dict(data, id=id))

def to_mongo(self: T, **kwargs) -> dict:
'''将模型实例转换为适合 MongoDB 插入的字典'''
exclude_unset = kwargs.pop("exclude_unset", False)
by_alias = kwargs.pop("by_alias", True)
parsed = self.model_dump(exclude_unset=exclude_unset, by_alias=by_alias, **kwargs)
if "_id" not in parsed and "id" in parsed:
parsed["_id"] = str(parsed.pop("id"))
for key, value in parsed.items():
if isinstance(value, uuid.UUID):
parsed[key] = str(value)
return parsed

def save(self: T, **kwargs) -> T | None:
'''利用上面描述的 to_mongo() 方法将实例转换为与 MongoDB 兼容的文档,并尝试将其插入数据库,处理可能发生的任何写入错误'''
collection = _database[self.get_collection_name()]
try:
collection.insert_one(self.to_mongo(**kwargs))
return self
except errors.WriteError:
logger.exception("Failed to insert document.")
return None

@classmethod
def get_or_create(cls: Type[T], **filter_options) -> T:
'''如果找到匹配的文档,则将其转换为类的实例。如果没有找到,则创建一个新实例,并使用过滤选项作为其初始数据并保存到数据库'''
collection = _database[cls.get_collection_name()]
try:
instance = collection.find_one(filter_options)
if instance:
return cls.from_mongo(instance)
new_instance = cls(**filter_options)
new_instance = new_instance.save()
return new_instance
except errors.OperationFailure:
logger.exception(f"Failed to retrieve document with filter options: {filter_options}")
raise

@classmethod
def bulk_insert(cls: Type[T], documents: list[T], **kwargs) -> bool:
'''批量写入文档以提高效率'''
collection = _database[cls.get_collection_name()]
try:
collection.insert_many([doc.to_mongo(**kwargs) for doc in documents])
return True
except (errors.WriteError, errors.BulkWriteError):
logger.error(f"Failed to insert documents of type {cls.__name__}")
return False

@classmethod
def find(cls: Type[T], **filter_options) -> T | None:
'''根据filter_options来查找数据库中的文档'''
collection = _database[cls.get_collection_name()]
try:
instance = collection.find_one(filter_options)
if instance:
return cls.from_mongo(instance)
return None
except errors.OperationFailure:
logger.error("Failed to retrieve document.")
return None

@classmethod
def bulk_find(cls: Type[T], **filter_options) -> list[T]:
'''检索与过滤选项匹配的多个文档。将每个检索到的 MongoDB 文档转换为模型实例,并将它们收集到列表中'''
collection = _database[cls.get_collection_name()]
try:
instances = collection.find(filter_options)
return [document for instance in instances if (document := cls.from_mongo(instance)) is not None]
except errors.OperationFailure:
logger.error("Failed to retrieve document.")
return []

@classmethod
def get_collection_name(cls: Type[T]) -> str:
'''确定与该类关联的 MongoDB 集合的名称。它期望该类具有嵌套的 Settings 类,该类具有指定集合名称的 name 属性'''
if not hasattr(cls, "Settings") or not hasattr(cls.Settings, "name"):
raise ImproperlyConfigured(
"Document should define an Settings configuration class with the name of the collection.")
return cls.Settings.name

数据类别和用户文档类

最后,我们看一下继承自 NoSQLBaseDocument 基类的子类的实现。这些是定义我们数据类别的具体类。你已经在本章中看到这些类,它们用于爬虫类中的文章、仓库和帖子。

首先定义了一个 enum 类,将所有数据类别类型集中管理。这些变量将作为常量,用于配置本书中的所有 ODM 类。

1
2
3
4
5
6
7
8
9
10
11
12
13
from enum import StrEnum

class DataCategory(StrEnum):
PROMPT = "prompt"
QUERIES = "queries"
INSTRUCT_DATASET_SAMPLES = "instruct_dataset_samples"
INSTRUCT_DATASET = "instruct_dataset"
PREFERENCE_DATASET_SAMPLES = "preference_dataset_samples"
PREFERENCE_DATASET = "preference_dataset"
POSTS = "posts"
ARTICLES = "articles"
REPOSITORIES = "repositories"
USERS = "users"

Document 类被引入作为其他文档类的抽象基类,基于 NoSQLBaseDocument ODM 类。它包括通用的属性,如内容、平台和作者信息,为所有继承自它的文档提供了标准化结构:

1
2
3
4
5
class Document(NoSQLBaseDocument, ABC):
content: dict
platform: str
author_id: UUID4 = Field(alias="author_id")
author_full_name: str = Field(alias="author_full_name")

具体的文档类型通过继承 Document 类来定义。RepositoryDocumentPostDocumentArticleDocument 类分别代表不同的数据类别,每个类别都有独特的字段和设置,指定它们在数据库中的集合名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class RepositoryDocument(Document):
name: str
link: str
class Settings:
name = DataCategory.REPOSITORIES

class PostDocument(Document):
image: Optional[str] = None
link: str | None = None
class Settings:
name = DataCategory.POSTS

class ArticleDocument(Document):
link: str
class Settings:
name = DataCategory.ARTICLES

最后,我们定义了 UserDocument 类,用于存储和查询所有来自 LLM Twin 项目的用户:

1
2
3
4
5
6
7
8
9
class UserDocument(NoSQLBaseDocument):
first_name: str
last_name: str
class Settings:
name = DataCategory.USERS

@property
def full_name(self):
return f"{self.first_name} {self.last_name}"

通过实现 NoSQLBaseDocument ODM 类,我们将重点放在了每个文档或领域实体的字段和特定功能上。所有 CRUD 功能都委托给了父类。通过利用 Pydantic 定义字段,我们还获得了开箱即用的类型验证。例如,当创建 ArticleDocument 类的实例时,如果提供的链接为 None 或不是字符串,系统将抛出一个错误,提示数据无效。

到此为止,我们已经完成了数据采集管道的实现,首先是 ZenML 组件,然后是爬虫的实现,最后封装成 ODM 类和数据类别文档。接下来的步骤是运行数据采集管道并将原始数据导入 MongoDB 数据仓库。

总结

最后,我们用一个 UML 来直观的总结这一章节中关于数据收集的代码设计结构:

classDiagram
    class NoSQLBaseDocument {
    	+UUID4 id
    	+__eq__(value: object) bool
    	+__hash__() int
    	+from_mongo(data: dict) NoSQLBaseDocument
    	+to_mongo(**kwargs) dict
    	+save(**kwargs) NoSQLBaseDocument
    	+get_or_create(**filter_options) NoSQLBaseDocument
    	+bulk_insert(documents: list) bool
    	+find(**filter_options) NoSQLBaseDocument
    	+bulk_find(**filter_options) list
    	+get_collection_name() str
    }

    class Document {
    	+dict content
    	+str platform
    	+UUID4 author_id
    	+str author_full_name
    }

    class RepositoryDocument {
    	+str name
    	+str link
    	+Settings
    }

    class PostDocument {
    	+Optional[str] image
    	+Optional[str] link
    	+Settings
    }

    class ArticleDocument {
    	+str link
    	+Settings
    }

    class UserDocument {
    	+str first_name
    	+str last_name
    	+Settings
    	+full_name()
    }

    class DataCategory {
    	<>
    	+PROMPT
    	+QUERIES
    	+INSTRUCT_DATASET_SAMPLES
    	+INSTRUCT_DATASET
    	+PREFERENCE_DATASET_SAMPLES
    	+PREFERENCE_DATASET
    	+POSTS
    	+ARTICLES
    	+REPOSITORIES
    }
    
    class CrawlerDispatcher {
    	-Dict[str, Type[BaseCrawler]] _crawlers
    	+build() CrawlerDispatcher
    	+register_xxx() CrawlerDispatcher
    	-register(domin:str, crawler:Type[BaseCrawler])
    	+get_crawler(url:str) BaseCrawler
    }
    
    class BaseCrawler {
    	<>
    	+NoSQLBaseDocument model
    	+extract(link:str, **kwargs) Any
    }
    
    class BaseSeleniumCrawler {
    	<>
    	+int scroll_limit
    	+driver
    	+set_extra_driver_options(op:Options)
    	+login()
    	+scroll_page()
    }
    
    class GithubCrawler
    class CustomArticleCrawler
    class MediumCrawler
    
    class MongoDB

    NoSQLBaseDocument <|-- document : 继承自 <|-- repositorydocument postdocument articledocument nosqlbasedocument userdocument --> DataCategory : 引用
    PostDocument --> DataCategory : 引用
    ArticleDocument --> DataCategory : 引用
    UserDocument --> DataCategory : 引用
    
    CrawlerDispatcher --> BaseCrawler : 引用
    BaseCrawler --> NoSQLBaseDocument : 引用
    BaseCrawler <|-- baseseleniumcrawler : 继承自 githubcrawler --|> BaseCrawler : 继承自
    CustomArticleCrawler --|> BaseCrawler : 继承自
    MediumCrawler --|> BaseSeleniumCrawler : 继承自
    
    NoSQLBaseDocument ..> MongoDB : 操作数据库

四、检索增强生成(RAG)管道

检索增强生成(RAG)在大多数生成式 AI 应用中是非常重要的。RAG 的核心职责是将自定义数据注入到大型语言模型(LLM)中,以执行给定的操作(例如总结、重述和提取注入的数据)。通常希望在 LLM 中使用它没有训练过的数据(例如私有数据或新数据),由于微调 LLM 是一项非常昂贵的操作,因此 RAG 成为了一种非常有吸引力的策略,可以绕过不断微调的需求来访问这些新数据。

我们将从理论部分开始,重点介绍 RAG 的基础和它如何工作。接着,我们将逐步引导你了解一个简单的 RAG 系统的所有组件:分块、嵌入和向量数据库。最终,我们将介绍用于高级 RAG 系统的各种优化。然后,我们将继续探索 LLM Twin 的 RAG 特性管道架构。在这一步,我们将应用章节开头讨论的所有理论内容。最后,我们将通过实现 LLM Twin 的 RAG 特性管道来演示实践中的应用。

本章的主要内容如下:

  • 理解 RAG
  • 高级 RAG 概述
  • 探索 LLM Twin 的 RAG 特性管道架构
  • 实现 LLM Twin 的 RAG 特性管道

理解 RAG

RAG 通过从外部数据源检索信息来增强生成式 AI 模型的准确性和可靠性。它是一种与 LLM 内部知识互补的技术。在深入细节之前,我们先来理解 RAG 的含义:

  • 检索(Retrieval):搜索相关数据
  • 增强(Augmented):将数据作为上下文添加到提示中
  • 生成(Generation):使用增强后的提示与 LLM 进行生成

任何 LLM 都只能理解它所训练过的数据,这通常被称为参数化知识。因此,即使 LLM 能够完美地回答过去发生的事情,但对于最新的数据或它未曾训练过的任何外部资源,它也无法得知。另一种情况是,它可能会自信地“幻想”并提供错误的答案。

要理解 RAG,首先你需要知道,在使用 RAG 时,我们将必要的信息注入到提示中,将增强后的提示传递给 LLM,进行最终回答。此时,LLM 会利用额外的上下文来回答用户问题。

RAG 解决了两个根本问题:

  • 幻觉(Hallucinations)
  • 过时或私有信息(Old or private information)

幻觉(Hallucinations)

如果一个没有使用 RAG 的聊天机器人被问到它没有训练过的问题,它很可能会”自信“地给出一个错误答案,让人难以分辨真假。即使 LLM 并不总是产生幻觉,这种情况仍然会引发对其答案可信度的担忧。因此,就带来了两个问题:

  • 什么时候可以信任 LLM 的回答?
  • 如何评估答案是否正确?

通过引入 RAG,我们强制 LLM 仅根据提供的上下文进行回答。LLM 作为推理引擎,而 RAG 提供的额外信息则充当生成答案的唯一真实来源。这样,我们可以快速评估 LLM 的答案是否基于外部数据。

过时信息(Old Information)

任何 LLM 都只能在某个特定时间点上的全世界知识的子集上进行训练或微调,主要有以下三个原因:

  • 私有数据(Private data):你无法在没有所有权或使用权的数据上训练模型。
  • 新数据(New data):新数据每时每刻都在产生,因此必须不断训练 LLM 以跟上更新。
  • 成本(Costs):训练或微调 LLM 是一项极其昂贵的操作,因此无法频繁进行。

RAG 解决了这些问题,因为你不再需要不断微调 LLM 以适应新数据(甚至是私有数据)。只需将必要的数据直接注入到 LLM 处理的提示(prompt)中,就能生成正确且有价值的答案。

简单 RAG 框架

RAG 系统的基本结构都非常相似。我们将首先集中了解最简单形式的 RAG。请注意,我们将简单的 RAG(Vanilla RAG) 和原始 RAG(Naive RAG) 交替使用,以避免重复。

RAG 系统由三个主要模块组成,彼此独立:

  • 数据输入管道(Ingestion pipeline):一个用于填充向量数据库的批处理或流式管道。
  • 检索管道(Retrieval pipeline):一个查询向量数据库并检索与用户输入相关的条目的模块。
  • 生成管道(Generation pipeline):使用检索到的数据来增强提示并与 LLM 一起生成答案。

由于这三个组件是独立的类或服务,我们将分别深入了解它们。但目前,让我们尝试回答“这三个模块如何连接?”的问题。以下是一个非常简单的概述:

  1. 在后台,数据输入管道根据计划或持续运行,将外部数据填充到向量数据库中;
  2. 在客户端,用户提出一个问题;
  3. 问题传递给检索模块,检索模块对用户输入进行预处理并查询向量数据库;
  4. 生成管道使用提示模板、用户输入和检索到的上下文来创建提示;
  5. 提示传递给 LLM 以生成答案;
  6. 答案显示给用户。

screenshot_2025-03-12_16.04.00

当你需要访问任何类型的外部信息时,就必须在你的生成式 AI 应用中实现 RAG。例如,在实现一个财务助手时,你很可能需要访问最新的新闻、报告和价格,才能提供有价值的答案。或者,如果你构建一个旅行推荐系统,你必须检索并解析一份潜在景点、餐馆和活动的列表。在训练时,LLM 并没有访问你的特定数据,因此你将经常需要在生成式 AI 项目中实现 RAG 策略。

现在,让我们深入探讨数据输入、检索和生成管道。

数据输入管道(Ingestion pipeline)

RAG 摄取管道从各种数据源(例如数据仓库、数据湖、网页等)提取原始文档。然后,它会清理、分块并嵌入文档。最后,它会将嵌入的块加载到向量数据库(或其他类似的向量存储)中。

因此,RAG 数据输入管道进一步分为以下几个模块:

  1. 数据提取模块(data extraction)
    该模块负责从各种数据源(如数据库、API 或网页)中收集所需的数据。这个模块高度依赖于你的数据。它可以简单地通过查询数据仓库来完成,也可以是更复杂的操作,例如爬取维基百科等网站。
  2. 清洗层(cleaning)
    清洗层对提取的数据进行标准化处理,并移除不需要的字符。这样可以确保数据质量,提高后续处理的准确性。
  3. 切分模块(chunking)
    该模块将清洗后的文档拆分成较小的部分。由于我们希望将文档的内容传递给嵌入模型,因此必须确保内容不超过模型的最大输入大小。切分还需要确保将语义上相关的区域分开。例如,在切分一本书的章节时,最优的方式是将相似的段落分到同一个切片中。这样做可以确保在检索时,只将必要的数据添加到提示中。
  4. 嵌入组件(embedding)
    嵌入组件使用嵌入模型将切片内容(如文本、图片、音频等)映射到一个密集的向量中,这个向量承载了语义信息。在本章的后续部分,我们将深入讨论嵌入模型。
  5. 加载模块(loading)
    加载模块负责将嵌入后的切片及其元数据文档存储到数据库中。元数据将包含一些关键信息,例如嵌入的内容、该切片的源 URL、该内容在网页上发布的时间等。嵌入向量作为索引,用于查询相似的切片,而元数据则用于访问用来增强提示的信息。

检索管道(Retrieval Pipeline)

检索模块接收用户输入(文本、图像、音频等),将其嵌入,并查询向量数据库(DB)以寻找与用户输入相似的向量。

检索步骤的主要功能是将用户输入投影到与嵌入数据库中作为索引的嵌入相同的向量空间。这使得我们能够通过比较向量存储中的嵌入与用户输入的向量来找到最相似的前 K 个条目。这些条目随后作为增强提示的一部分,传递给 LLM 用于生成答案。

为了比较两个向量,必须使用一种距离度量方法,例如欧几里得距离或曼哈顿距离。但最常用的距离度量是余弦距离,计算公式为:

CosineDistance=1cos(θ)=1ABABCosineDistance = 1 - \cos(\theta) = 1 - \frac{A \cdot B}{\left \| A \right \| \cdot \left \| B \right \|}

其中,θ\theta 是两个向量之间的夹角。余弦距离的值范围从 -1 到 1:

  • 当两个向量完全相反时,余弦距离为 -1;
  • 当两个向量正交时,余弦距离为 0;
  • 当两个向量指向相同的方向时,余弦距离为 1。

通常,余弦距离在非线性复杂的向量空间中表现良好。然而,需要注意的是,选择合适的向量之间的距离度量方法取决于数据和所使用的嵌入模型。

关键因素需要特别强调的是,用户的输入和嵌入必须位于相同的向量空间中。否则,你就无法计算它们之间的距离。为此,必须以与 RAG 数据输入管道中处理原始文档相同的方式对用户输入进行预处理。这意味着必须清理用户输入,必要时进行切分,并使用相同的函数、模型和超参数来嵌入用户输入。这与训练和推理时数据特征的预处理方法相似。如果处理不一致,推理结果可能会不准确,这种现象也被称为“训练-服务偏差”(training-serving skew)。

生成管道(Generation Pipeline)

RAG 系统的最后一步是获取用户的输入,检索相关数据,将其传递给大语言模型(LLM),并生成有价值的答案。

在这一步中,最终的提示(prompt)是由系统模板和用户查询以及检索到的上下文填充而成的。根据应用的不同,您可能会使用一个单独的提示模板或多个提示模板。通常,所有的提示工程(prompt engineering)工作都是在提示模板的层面进行的。

以下是一个虚拟示例,展示了一个通用的系统和提示模板,并说明它们如何与检索逻辑和 LLM 一起使用,生成最终的答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
system_template = """
You are a helpful assistant who answers all the user's questions politely.
"""

prompt_template = """
Answer the user's question using only the provided context. If you cannot
answer using the context, respond with "I don't know."

Context: {context}
User question: {user_question}
"""

user_question = "<your_question>"
retrieved_context = retrieve(user_question)

prompt = f"{system_template}\n"
prompt += prompt_template.format(context=retrieved_context, user_question=user_question)
answer = llm(prompt)

随着提示模板的不断演化,每次修改都应使用机器学习操作(MLOps)的最佳实践进行跟踪和版本控制。这样,在训练或推理时,您始终可以知道给定的答案是由特定版本的 LLM 和提示模板生成的。您可以通过 Git 跟踪版本,或者将提示模板存储在数据库中,或者使用像 LangFuse 这样的专门提示管理工具。

正如我们在检索管道中看到的,直接影响 RAG 系统准确性的一些关键方面是外部数据的嵌入,通常存储在向量数据库中,用户查询的嵌入,以及我们如何使用余弦距离等函数来衡量这两者之间的相似性。为了更好地理解 RAG 算法的这一部分,我们将深入探讨嵌入是什么以及它们如何计算。

嵌入(Embeddings)

什么是嵌入(Embeddings)?

想象一下,我们正在教计算机理解世界。嵌入就像是一个特殊的翻译器,将这些事物转化为数值代码。不过,这些代码并不是随机的,因为相似的单词或物品会被赋予彼此接近的代码。它就像一张地图,其中具有相似含义的单词被聚集在一起。

从更理论的角度来看,嵌入是物体的稠密数值表示,这些物体以向量的形式编码在一个连续的向量空间中,可能是单词、图像,或者推荐系统中的物品。这个转化帮助捕捉物体之间的语义意义和关系。例如,在自然语言处理(NLP)中,嵌入将单词转换为向量,使得语义相似的单词在向量空间中彼此接近

嵌入的可视化

一种常见的方法是可视化嵌入,以便理解和评估它们之间的几何关系。由于嵌入通常有超过 2 或 3 维,通常在 64 到 2048 之间,因此需要将其重新投影到 2D 或 3D 空间中。

例如,你可以使用 UMAPUMAP文档),这是一种降维方法,因其能够在将嵌入投影到 2D 或 3D 时保持点之间的几何特性而受到广泛欢迎。另一个常用的降维算法是 t-SNEt-SNE文档)。然而,相比 UMAP,t-SNE 更具随机性,并且不总是保持点之间的拓扑关系。

总结来说,嵌入是一个将对象转换为能够捕捉其语义关系的数值向量的技术,广泛应用于自然语言处理、图像处理和推荐系统等领域。

为什么嵌入(Embeddings)如此强大

首先,机器学习模型只能处理数值型数据。当处理表格数据时,这通常不是问题,因为数据通常是数值型的,或者可以很容易地转换为数字。然而,当我们希望将文字、图像或音频数据输入模型时,嵌入就显得特别有用。

例如,在处理 Transformer 模型时,您需要对所有文本输入进行分词,每个分词都会关联一个嵌入(embedding),神经网络的密集层可以轻松地处理这些嵌入(embedding)。

基于这个例子,您可以使用嵌入来编码任何类别变量,并将其传递给机器学习模型。那么,为什么不使用其他简单的方法,如独热编码(One-hot Encoding)呢?当处理具有高基数的类别变量时,如语言词汇表,使用其他经典方法会遭遇“维度灾难”(curse of dimensionality)。例如,如果您的词汇表有 10,000 个标记,那么应用独热编码后,每个标记的长度就是 10,000。如果输入序列有 N 个标记,那么输入参数将变成 N×10,000N \times 10,000。若 N>=100N >= 100,则文本输入通常会过于庞大,无法使用。另一个经典方法,如哈希编码(Feature Hashing),虽然不受维度灾难的困扰,但它会丢失向量之间的语义关系。

[!TIP]

独热编码

  • 是一种将类别变量转换为二进制矩阵表示的技术。每个类别都会被表示为一个唯一的二进制向量。对于每个类别变量,都会创建一个二进制向量,其长度等于唯一类别的数量,除对应类别的索引外,所有值都为零。
  • 优点:保留了类别的所有信息,简单且可解释。
  • 缺点:当类别变量有许多唯一值时,特征空间会变得非常高维,导致该方法在实际应用中不可行。

特征哈希

  • 也称为哈希编码或“哈希技巧”,是一种通过对类别值应用哈希函数将类别变量转换为数值特征的方法。与独热编码相比,这种方法不受唯一类别数的限制,而是通过将类别映射到固定数量的箱或桶中来减少特征空间的维度。
  • 优点:减少了特征空间的维度,这在处理高基数类别变量时特别有用。在内存使用和计算时间方面具有较高的效率。
  • 缺点:哈希可能会发生冲突,即不同的类别可能会映射到同一个桶中,从而导致信息丢失。此外,这种映射使得方法不可解释,并且很难理解原始类别和哈希特征之间的关系。

嵌入的优势

嵌入帮助我们编码类别变量,并且可以控制输出向量的维度。与朴素的哈希技巧不同,嵌入使用巧妙的方法将信息压缩到比哈希技巧更低维的空间中。

其次,通过对输入进行嵌入,我们可以降低它的维度,并将所有语义信息压缩成一个密集的向量。这在处理图像时是一种非常流行的技术,其中卷积神经网络(CNN)编码模块将高维的图像信息映射为嵌入,然后通过CNN解码器对该嵌入进行处理,进行分类或回归操作。

Typical_cnn

上图是一个典型的CNN结构的示意图。想象一下每个层中的小方块,它们是“感受区块”。每个小方块将信息传递给前一层的单个神经元。在网络的不同层级中,主要发生了两个关键操作:

  • 缩小图像尺寸:特殊的“采样 subsampling”操作将图像尺寸减小,集中关注图像的关键信息。
  • 学习特征:另一方面,“卷积 convolution”操作会增加图像特征层的尺寸,以便网络能够从图像中学习更复杂的特征。

最终,网络中的完全连接层会将所有这些处理过的信息转化为最终的向量嵌入,即图像的数值表示。

如何创建嵌入(Embeddings)

嵌入是通过深度学习模型创建的,这些模型能够理解输入的上下文和语义,并将其投影到一个连续的向量空间中。根据数据输入的类型,嵌入的创建方法有所不同。因此,在选择嵌入模型之前,理解你的数据和需求至关重要。

文本数据中的嵌入

例如,当处理文本数据时,Word2VecGloVe 是早期用于创建词汇嵌入的常见方法,它们在一些简单的应用中仍然广泛使用。另一种流行的方法是使用 encoder-onlytransformers,例如 BERT,及其系列中的其他方法,例如 RoBERTa。这些模型利用转换器架构的编码器将您的输入智能地投影到密集向量空间中,稍后可将其用作嵌入。

[!TIP]

BERT

基于 Transformer 的双向编码器表示技术(英语:Bidirectional Encoder Representations from Transformers,BERT)是用于自然语言处理(NLP)的预训练技术,由 Google 提出。通过在所有层中共同调整左右情境,利用无标记文本预先训练深度双向表示(深度双向、无监督式)。同时考虑句子中单词的左右上下文,BERT 使用双向方法,而不是按顺序分析文本,BERT 同时查看句子中的所有单词。

例如:“The bank is situated on the ___ of the river.”

  • 在单向模型中,对空白的理解将严重依赖于前面的单词,并且模型可能难以辨别*“bank”*是指银行还是河的一侧。

  • BERT 是双向的,它同时考虑左侧(“The bank is situated on the”)和右侧上下文(“of the river”),从而实现更细致的理解。它理解缺失的单词可能与银行的地理位置有关,展示了双向方法带来的语境丰富性。

BERT 的创新之处在于借助 Transformer 学习双向表示,Transformer 是一种深度学习组件,不同于递归神经网络 (RNN) 对顺序的依赖性,它能够并行处理整个序列。因此可以分析规模更大的数据集,并加快模型训练速度。Transformer 能够使用注意力机制收集词语相关情境的信息,并以表示该情境的丰富向量进行编码,从而同时处理(而非单独处理)与句中所有其他词语相关的词语。该模型能够学习如何从句段中的每个其他词语衍生出给定词语的含义。

BERT GPT
架构 BERT 专为双向表征学习而设计。它使用掩码语言模型目标,根据左右上下文预测句子中缺失的单词。 另一方面,GPT 是为生成式语言建模而设计的。它利用单向自回归方法,根据前面的上下文预测句子中的下一个单词。
预训练目标 BERT 使用掩码语言模型目标和下一句预测进行预训练。它专注于捕捉双向上下文并理解句子中单词之间的关系。 GPT 经过预先训练,可以预测句子中的下一个单词,这有助于模型学习语言的连贯表示并生成上下文相关的序列。
上下文理解 BERT 对于需要深入理解句子内的上下文和关系的任务非常有效,例如文本分类、命名实体识别和问答。 GPT 擅长生成连贯且上下文相关的文本。它常用于创意任务、对话系统和需要生成自然语言序列的任务。
任务类型和用例 常用于文本分类、命名实体识别、情感分析和问答等任务。 应用于文本生成、对话系统、总结和创意写作等任务。
微调与小样本学习 BERT 通常使用标记数据针对特定的下游任务进行微调,以使其预训练表示适应当前的任务。 GPT 旨在执行小样本学习,它可以用最少的特定任务训练数据推广到新任务。

下面是一个简单的 Python 代码示例,演示如何使用 SentenceTransformer 来计算句子的嵌入,并计算它们之间的余弦相似度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")
sentences = [
"The dog sits outside waiting for a treat.",
"I am going swimming.",
"The dog is swimming."
]

# 计算嵌入
embeddings = model.encode(sentences)
print(embeddings.shape) # 输出: [3, 384]

# 计算余弦相似度
similarities = model.similarity(embeddings, embeddings)
print(similarities)
# 输出:
# tensor([[ 1.0000, -0.0389, 0.2692],
# [-0.0389, 1.0000, 0.3837],
# [ 0.2692, 0.3837, 1.0000]])

在这个例子中,我们开源看到这三个句子的嵌入向量与它们之间的余弦相似度:

  • 第一个句子与其自身的相似度为 1。
  • 第一个和第二个句子的相似度接近 0,说明它们没有什么相似性。
  • 第一个和第三个句子的相似度较高,表明它们有一些共同的上下文。

嵌入模型因为使用场景的不同而变化。我们可以在 Hugging Face 上的 Massive Text Embedding Benchmark (MTEB) 上找到特定的模型。根据需求,可以考虑性能最佳的模型、准确率最高的模型或内存占用最小的模型。同时,使用 Hugging FaceSentenceTransformer 使不同模型之间的切换变得简单。因此,您始终可以尝试各种选项。

处理图像时,您可以使用卷积神经网络 (CNN) 嵌入它们。流行的 CNN 网络基于 ResNet 架构。但是,我们不能直接将图像嵌入技术用于录音。相反,我们可以创建音频的视觉表示,例如频谱图,然后将图像嵌入模型应用于这些视觉效果。这使我们能够以计算机可以理解的方式捕捉图像和声音的本质。

通过利用 CLIP 之类的模型,您实际上可以将一段文本和一张图片嵌入到同一个向量空间中。这样您就可以使用句子作为输入来查找相似的图像,反之亦然,这证明了 CLIP 的实用性。

crazy_cat
  • “A crazy cat smiling.”
  • “A white and brown cat with a yellow bandana.”
  • “A man eating in the garden.”

在下面的代码片段中,我们使用 CLIP 对一张小猫图像(如上图)和三句话进行编码。最后,我们使用余弦相似度来计算图片和句子之间的相似度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from io import BytesIO
import requests
from PIL import Image
from sentence_transformers import SentenceTransformer

image = Image.open(image_dir)
model = SentenceTransformer("clip-ViT-B-32")
img_emb = model.encode(image)
text_emb = model.encode(
[
"A crazy cat smiling.",
"A white and brown cat with a yellow bandana.",
"A man eating in the garden."
])
print(text_emb.shape) # noqa # Output: (3, 512)
similarity_scores = model.similarity(img_emb, text_emb) print(similarity_scores) # noqa
# Output: tensor([[0.3068, 0.3300, 0.1719]])

至此,我们简要介绍了如何计算嵌入。具体实现的范围很广,大多数数据类别(如单词、句子、文档、图像、视频和图形)都可以计算嵌入。当我们需要计算两个不同数据类别之间的距离(例如句子向量和图像向量之间的距离)时,必须使用专门的模型,这一点至关重要。这些模型旨在将两种数据类型投影到同一个向量空间(如 CLIP),以确保准确的距离计算。

向量数据库

向量数据库是专门设计用于高效存储、索引和检索向量嵌入的数据库。传统的基于标量的数据库难以应对向量数据的复杂性,因此向量数据库对于实时语义搜索等任务至关重要。

虽然像 FAISS 这样的独立向量索引对于相似性搜索很有效,但它们缺乏向量数据库的全面数据管理功能。向量数据库支持 CRUD 操作、元数据过滤、可扩展性、实时更新、备份、生态系统集成和强大的数据安全性,因此它们比独立索引更适合生产环境。

向量数据库如何工作?

想想您通常如何搜索数据库。您输入一些特定内容,系统会输出精确匹配。这就是传统数据库的工作方式。向量数据库则不同。我们不是寻找完美匹配,而是寻找查询向量的最近邻居。在底层,向量数据库使用近似最近邻 (Approximate Nearest Neighbor, ANN) 算法来查找这些近邻。

虽然 ANN 算法不会返回给定搜索的最佳匹配,但标准最近邻算法在实践中太慢了。此外,经验表明,仅使用给定输入查询的最佳匹配的近似值就可以很好地工作。因此,准确性和延迟之间的权衡之下最终选择了 ANN 算法。

这是向量数据库的典型工作流程:

  1. 索引向量:使用针对高维数据优化的数据结构对向量进行索引。常见的索引技术包括分层可导航小世界 (HNSW)、随机投影、乘积量化 (PQ) 和局部敏感哈希 (LSH)。

  2. 查询相似性:在搜索过程中,数据库查询索引向量以找到与输入向量最相似的向量。此过程涉及基于相似性度量(例如余弦相似性、欧几里得距离或点积)比较向量。每种方法都有独特的优势,适用于不同的用例。

  3. 结果的后处理:在识别潜在匹配后,对结果进行后处理以提高准确性。此步骤可确保将最相关的向量返回给用户。

向量数据库可以在向量搜索之前或之后根据元数据过滤结果。这两种方法在性能和准确性方面都有权衡。查询还依赖于元数据(以及向量索引),因此它包含用于过滤操作的元数据索引用户。

向量索引的算法

向量数据库使用各种算法来创建向量索引并高效地管理搜索数据:

  • 随机投影:随机投影通过使用随机矩阵将向量投影到低维空间来降低向量的维数。该技术保留了向量之间的相对距离,从而有助于更快地进行搜索。
  • 乘积量化(PQ):PQ 通过将向量分成更小的子向量,然后将这些子向量量化为代表性代码来压缩向量。这减少了内存使用量并加快了相似性搜索。
  • 局部敏感哈希 (LSH):LSH 将相似的向量映射到存储桶中。此方法通过关注数据子集来实现快速近似最近邻搜索,从而降低计算复杂度。在查询时,只查找同一个桶内的近邻,从而提高查询效率。
  • 分层可导航小世界 (HNSW):HNSW 构建一个多层图,其中每个节点代表一组向量。相似的节点相互连接,允许算法导航图并有效地找到最近的邻居。

这些算法使向量数据库能够有效地处理复杂和大规模数据,使其非常适合各种 AI 和 ML 应用程序。

数据库操作

向量数据库还与普通数据库拥有共同的一些特点,以确保高性能、容错性和在生产环境中的易管理性。关键操作包括:

  • 分片和复制:数据会被划分(分片)到多个节点上,以确保可扩展性和高可用性。数据在节点之间的复制有助于维护数据完整性,并在节点失败时确保数据的可用性。
  • 监控:持续监控数据库性能,包括查询延迟和资源使用情况(RAM、CPU、磁盘),有助于维持最佳操作,并在潜在问题影响系统之前发现它们。
  • 访问控制:实施强大的访问控制机制,确保只有授权用户可以访问和修改数据。这包括基于角色的访问控制和其他安全协议来保护敏感信息。
  • 备份:定期的数据库备份对灾难恢复至关重要。自动备份过程确保在数据损坏或丢失的情况下,能够恢复到之前的状态。

高级 RAG 框架

我们刚刚介绍的基础RAG框架并没有解决许多影响检索和答案生成质量的基本方面,比如:

  • 检索到的文档是否与用户的问题相关?
  • 检索到的上下文是否足够回答用户的问题?
  • 是否有冗余信息,只是增加了增强提示的噪音?
  • 检索步骤的延迟是否符合我们的要求?
  • 如果我们无法使用检索到的信息生成有效答案,应该怎么办?

从上述问题中,我们可以得出两个结论。第一个结论是,我们需要一个强大的评估模块来衡量和量化检索数据的质量,并根据用户的问题生成答案。我们将在第九章详细讨论这个话题。第二个结论是,我们必须改进RAG框架,直接在算法中解决检索的限制。这些改进被称为高级RAG

基础RAG设计可以在三个不同的阶段进行优化:

  • 检索前:这个阶段关注如何构建和预处理数据,以优化数据索引和查询。
  • 检索:这个阶段主要围绕改进嵌入模型和元数据过滤,以提高向量检索步骤的效果。
  • 检索后:这个阶段主要聚焦于通过不同方式过滤检索到的文档中的噪音,并在将其输入LLM进行答案生成之前对提示进行压缩。

检索前优化

检索前的优化步骤有两种不同的方式:

  • 数据索引:这是RAG摄取管道的一部分,主要是在清理或分块模块中实现,目的是对数据进行预处理,以便更好地进行索引。
  • 查询优化:该算法直接作用于用户的查询,在嵌入查询并从向量数据库中检索数据之前。

数据索引

在我们使用嵌入来索引数据时,嵌入语义上代表了分块文档的内容,因此,大部分数据索引技术都专注于通过更好的预处理和数据结构化来提高检索效率,例如:

  • 滑动窗口(Sliding Window):滑动窗口技术在文本块之间引入重叠,确保在块边界附近的重要上下文得到保留,从而提高检索的准确性。这在一些领域中尤为有用,如法律文件、科研论文、客户支持日志和医疗记录等,因为关键信息往往跨越多个部分。嵌入是在文本块及其重叠部分上计算的,因此滑动窗口技术通过保持跨越边界的上下文来增强系统检索相关和连贯信息的能力。

  • 增强数据粒度:包括数据清理技术,如删除无关细节、验证事实准确性以及更新过时信息。一个干净且准确的数据集有助于更精确的检索。

  • 元数据(Metadata):添加如日期、URL、外部ID或章节标记等元数据标签,帮助在检索时有效地过滤结果。

  • 优化索引结构:基于不同的数据索引方法,如不同的块大小和多重索引策略。

  • 小到大(Small-to-big):该算法将用于检索的文本块与最终生成答案时使用的上下文解耦。该算法使用较小的文本序列来计算嵌入,同时保留该序列本身以及周围较大的窗口作为元数据。因此,使用较小的块可以提高检索的准确性,而更大的上下文则为LLM提供更多的上下文信息。但是,如果我们使用整篇文本来计算嵌入,可能会引入太多噪音,或者文本包含多个主题,这会导致嵌入的整体语义表示较差。

查询优化 (Query Optimization)

在查询优化方面,我们可以利用查询路由、查询重写和查询扩展等技术,进一步优化为LLM检索到的信息:

  • 查询路由 (Query Routing):根据用户的输入,可能需要与不同类别的数据交互,并分别查询每个类别。查询路由用于根据用户的输入决定采取何种操作,类似于if/else语句,但决策完全基于自然语言,而不是逻辑语句。

    screenshot_2025-03-13_10.28.21

    如图所示,假设基于用户的输入,为了执行 RAG,我们可以使用向量搜索查询从向量数据库中检索附加的上下文,或者通过将用户查询转换为 SQL 命令来从标准 SQL 数据库中检索,或者通过利用 REST API 调用从互联网中获取。查询路由器还可以检测是否需要上下文,帮助我们避免对外部数据存储进行冗余调用。此外,查询路由器还可以用来选择最佳的提示模板。例如,在 LLM Twin 用例中,根据用户是想要一段文章、一个帖子还是一段代码片段,需要不同的提示模板来优化创作过程。查询路由通常使用 LLM 来决定采取哪条路径,或者通过选择具有最相似向量的路径来进行嵌入选择。总的来说,查询路由类似于 if/else 语句,但它更具灵活性,因为它直接与自然语言交互。

  • 查询重写 (Query Rewriting):有时,用户的初始查询可能与数据的结构不完全对齐。查询重写通过重新构造问题,使其更好地匹配已索引的信息来解决这个问题。常见的方法包括:

    • 释义 (Paraphrasing):在保持原意的基础上重述用户的查询(例如,“气候变化的原因是什么?”可以重写为“导致全球变暖的因素”)。
    • 同义词替换 (Synonym Substitution):用同义词替换不常见的词,以扩大搜索范围(例如,“joyful”可以重写为“happy”)。
    • 子查询 (Sub-queries):对于较长的查询,可以将其拆分为多个更短且更集中的子查询。这有助于检索阶段更精确地识别相关文档。
  • 假设文档嵌入 (HyDE):该技术通过让 LLM 生成对查询的假设响应。然后,将原始查询和LLM的响应一起送入检索阶段。

  • 查询扩展 (Query Expansion):该方法通过添加额外的术语或概念来丰富用户的提问,从而提供同一初始问题的不同视角。例如,在搜索“疾病”时,可以利用同义词和与原始查询词相关的术语,甚至包括“疾病”或“病症”。

  • 自查询 (Self-query):核心思想是将非结构化查询映射到结构化查询。LLM 识别输入文本中的关键实体、事件和关系,并将这些身份作为过滤参数,用于减少向量搜索的空间(例如,识别查询中的城市名,如“巴黎”,并将其添加到过滤器中,以减少向量搜索空间)。

检索优化

检索步骤可以通过两种基本方式进行优化:

  • 改进嵌入模型:在 RAG 摄取管道中使用的嵌入模型,用于编码分块文档,并在推理时转换用户的输入。
  • 利用数据库的过滤和搜索功能:此步骤仅在推理时使用,当需要基于用户输入检索最相似的文本块时使用。

这两种策略的最终目标一致:通过利用查询和已索引数据之间的语义相似性,增强向量检索步骤。

改进嵌入模型

在改进嵌入模型时,通常需要对预训练的嵌入模型进行微调,以使其适应特定领域的术语和细微差别,尤其是在术语不断变化或包含稀有词汇的领域。

如果不想对嵌入模型进行微调,可以利用Instructor模型Instructor模型示例)通过针对你领域的指令/提示来引导嵌入生成过程。使用此模型来定制你的嵌入网络以适应数据,可能是一个不错的选择,因为微调模型会消耗更多的计算和人工资源。

以下是一个使用Instructor模型嵌入AI相关文章标题的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
from InstructorEmbedding import INSTRUCTOR

# 初始化Instructor模型
model = INSTRUCTOR("hkunlp/instructor-base")

# 定义文章标题和指令
sentence = "RAG Fundamentals First"
instruction = "Represent the title of an article about AI:"

# 获取嵌入向量
embeddings = model.encode([[instruction, sentence]])
print(embeddings.shape) # 输出形状 (1, 768)

源代码可以在这里找到。

在运行Instructor代码之前,你需要创建并激活一个虚拟环境:

1
python3 -m venv instructor_venv && source instructor_venv/bin/activate

然后安装所需的Python依赖:

1
pip install sentence-transformers==2.2.2 InstructorEmbedding==1.0.1

利用经典数据库过滤和搜索功能提高检索

在检索优化的另一个方面,你可以通过利用经典的过滤和搜索数据库功能来提高检索效率:

  • 混合搜索 (Hybrid Search):这是一种结合向量搜索和关键词搜索的方法。关键词搜索擅长识别包含特定关键词的文档。当任务要求极高的精度,且检索的信息必须包括精确的关键词匹配时,混合搜索表现优秀。虽然向量搜索功能强大,但有时在寻找精确匹配时会遇到困难,但在寻找更广泛的语义相似性时却表现优异。通过将这两种方法结合,你可以利用关键词匹配和语义相似性。通常有一个叫做alpha的参数来控制两者之间的权重。算法会进行两次独立搜索,之后对结果进行标准化和统一。

  • 过滤向量搜索 (Filtered Vector Search):这种搜索方式利用元数据索引来过滤特定关键词的元数据。与混合搜索不同,你首先仅通过向量索引检索数据,然后在检索前或检索后进行过滤,以减少搜索空间。

实践中的检索优化

在实际应用中,通常从过滤向量搜索混合搜索开始,因为它们相对容易实现。这种方法让你可以根据性能灵活调整策略。如果结果不如预期,你始终可以微调嵌入模型以提高检索效果。