本章内容包括:
理解数据集管理
使用设计原则构建数据集管理服务
构建示例数据集管理服务
使用开源方法进行数据集管理
在我们对深度学习系统进行了一般性讨论之后,我们准备继续进行后续章节的学习,这些章节专注于这些系统中的特定组件。我们首先介绍数据集管理,不仅因为深度学习项目是以数据驱动的,而且因为我们希望提醒您在构建其他服务之前考虑数据管理的重要性。
在深度学习模型开发过程中,通常会忽视数据集管理(DM),而将注意力集中在数据处理、模型训练和模型服务上。数据工程中的一种普遍观点是,我们只需要良好的数据处理流水线,例如ETL(抽取、转换和加载)流水线。但是,如果在项目进行过程中避免管理数据集,数据收集和数据集使用逻辑会变得越来越复杂,模型性能的提升变得困难,最终整个项目会变慢。一个良好的DM系统可以通过将训练数据收集和使用解耦,加快模型开发速度;同时,它还通过对训练数据进行版本管理,实现模型的可重现性。
我们保证,在您的现有数据处理流水线之外,构建或至少设置一个数据集管理组件时,您会对自己明智的决策感到满意。并且在处理训练和模型服务组件之前构建它。您的深度学习项目开发将更快,并且能够在长期内产生更好的结果和更简单的模型。因为DM组件可以将上游数据的复杂性与模型训练代码隔离开来,使得模型算法开发和数据开发可以并行进行。
本章重点介绍了如何为深度学习项目构建数据集管理功能。由于深度学习算法、数据流水线和数据源的多样性,数据集管理是深度学习行业中经常讨论的主题。目前还没有统一的方法,并且似乎永远不会有一个统一的方法。因此,为了在实践中对您有所帮助,我们将专注于教授设计原则,而不是推崇一种单一的方法。在本章中构建的示例数据集管理服务展示了一种实现这些原则的可能方式。
在第2.1节中,您将了解为什么需要数据集管理,它应该解决哪些挑战,以及它在深度学习系统中的关键作用。我们还将介绍其关键设计原则,为下一节的具体示例做准备。
在第2.2节中,我们将根据第2.1节介绍的概念和设计原则,展示一个基于数据集管理的服务。首先,我们将在您的本地机器上设置该服务并进行实验。其次,我们将讨论内部数据集存储和数据模式、用户场景、数据摄取API和数据集获取API,以及设计和用户场景的概览。在这个过程中,我们还将讨论我们在服务设计中做出的一些重要决策的利弊。
在第2.3节中,我们将介绍两种开源方法。如果您不想自己构建数据集管理服务,您可以使用已经构建、可用和可适应的组件。例如,如果您的现有数据流水线是建立在Apache Spark之上,您可以使用Delta Lake和Petastorm来进行数据集管理。或者如果您的数据直接来自于AWS简单存储服务(S3)或Azure Blob,您可以采用Pachyderm。我们以图像数据集准备为例,展示这两种方法如何在实践中处理非结构化数据。通过本章的学习,您将对数据集管理的固有特性和设计原则有深入的了解,因此您可以自己构建一个数据集管理服务,或者在您的工作中改进现有的系统。
理解数据集管理服务
数据集管理组件或服务是一个专门用于组织数据以支持模型训练和模型性能故障排查的专用数据存储。它处理来自上游数据源的原始数据,并以一种明确定义的结构(数据集)返回用于模型训练的训练数据。图2.1显示了数据集管理服务提供的核心价值。在图中,我们可以看到数据集管理组件将原始数据转换为一致的数据格式,以便更好地支持模型训练,因此下游的模型训练应用可以专注于算法开发。
为什么深度学习系统需要数据集管理
在我们开始查看示例数据集管理服务之前,让我们花一些时间解释为什么数据集管理(DM)是任何深度学习系统中至关重要的一部分。这一部分非常重要,因为根据我们的经验,除非你完全理解“为什么”,否则不可能设计出解决实际问题的系统。
关于“为什么”的问题,有两个答案。首先,通过将训练数据的收集与数据消费解耦,DM可以加快模型的开发速度。其次,一个设计良好的DM服务通过对训练数据进行版本跟踪,支持模型的可重复性。让我们详细看一下这两个观点。
解耦训练数据收集和数据消费
如果你完全独立地进行深度学习项目的工作,项目开发流程是以下步骤的迭代循环:数据收集、数据预处理、训练和评估(参见图2.2)。尽管如果你在数据收集组件中改变数据格式,可能会破坏下游的数据预处理代码或训练代码,但这不是一个大问题。因为你是唯一的代码所有者,你可以自由地进行更改,不会影响其他人。
当我们构建一个严肃的深度学习平台,为数十个不同的深度学习项目提供支持,并向多个人和团队开放时,简单的数据流程图会迅速扩展成一个令人眼花缭乱的三维图表(图2.3)。
图2.3显示了一个企业深度学习开发环境的复杂性。在这种设置下,每个人只负责一个步骤,而不是整个工作流,并且他们为多个项目开发自己的工作。理想情况下,这个过程是高效的,因为人们通过专注于一个特定的问题来建立自己的专业知识。但是问题在于,通信成本通常被忽视。
当我们将工作流程的步骤(图2.2)分配给多个团队时,需要数据模式进行协调。如果没有数据契约,下游团队就不知道如何读取上游团队发送的数据。让我们回到图2.3。想象一下,如果有10个项目由四个团队并行开发,特别是如果每个团队处理工作流程的不同步骤,那么我们需要多少个数据模式来在团队之间进行沟通。
现在,如果我们想要向训练数据集添加一个新功能或属性(例如文本语言),我们需要召集每个团队,就新的数据格式达成共识,并实施更改。这是一项巨大的工作,因为在企业中进行跨团队协作是复杂的。要进行一个小的变更通常需要数月时间,因为每个团队都有自己的优先事项,你必须等待他们的待办事项清单。
情况更糟的是,深度学习模型的开发是一个迭代过程。为了提高模型的准确性,需要不断调整训练数据集(包括上游数据流水线)。这要求数据科学家、数据开发人员和平台开发人员高频率地进行交互,但由于跨团队的工作流程设置,数据迭代的速度很慢,这是在生产环境中模型开发如此缓慢的原因之一。
另一个问题是,当我们同时开发多种类型的项目(图像、视频和文本)时,数据模式的数量将会爆炸。如果让每个团队自由定义新的数据模式并且不加以适当管理,保持系统向后兼容几乎是不可能的。新的数据更新将变得越来越困难,因为我们必须花费额外的时间来确保新的数据更新不会破坏过去构建的项目。因此,项目开发速度将显著减慢。
为了解决迭代速度慢和数据模式管理问题,我们可以构建一个数据集管理服务。让我们看一下图2.4,以帮助确定引入数据集管理服务后项目开发工作流程的变化。
在图2.4中,我们看到一个数据集管理服务将模型开发工作流程分成了两个独立的空间:数据开发人员空间和数据科学家空间。
现在,长迭代循环(图2.2)被分成了两个小循环(图2.4),每个循环由一个团队负责,因此数据开发人员和数据科学家可以分别迭代数据收集和模型训练;因此,深度学习项目可以迭代得更快。
您可能还注意到,现在我们将所有数据模式放在一个地方:数据集管理服务,该服务管理每种数据集的两个强类型数据模式-摄入数据模式和训练数据模式。通过在数据摄入和训练过程中进行数据转换,确保上游数据收集中的数据更改不会破坏下游的模型训练。由于数据模式是强类型的,未来的数据升级可以轻松地保持向后兼容。
对于刚开始或实验阶段的项目,定义一个强类型数据集可能不是一个好主意,因为我们仍在探索各种数据选项。因此,我们还建议定义一个特殊的无模式限制的数据集类型,例如GENERIC类型。对于此数据集类型中的数据,DM将接受原始数据,不执行数据验证和转换(详细示例请参见2.2.6节)。从数据处理流水线收集到的数据可以直接由训练过程使用。尽管整个工作流程可能不够稳定,但无模式数据集类型满足了项目初期灵活性的需求。一旦项目成熟,我们可以创建强类型模式并为其定义数据集类型。
总结这一部分,管理数据集类型的两个数据模式是将数据科学家和数据开发人员解耦的关键。在2.2.6节中,我们将展示如何在示例数据集管理服务中实现这些模式。
实现模型可重现性
一个良好设计的数据集管理服务通过对训练数据集进行版本跟踪来支持模型的可重现性,例如使用版本字符串来获取之前模型训练运行中所使用的确切训练文件。模型可重现性对于数据科学家(模型算法开发者)的优势在于,您可以重复在特定数据集上运行深度学习算法(例如自注意力转换器在自然语言处理中),并获得相同或类似质量的结果。这被称为算法可重现性。
从深度学习系统开发者的角度来看,模型可重现性是算法可重现性的超集。它要求数据集管理系统能够重现其输出结果(数据集)。例如,我们需要获取确切的训练数据和训练配置,以复现过去训练的模型。
模型可重现性对机器学习项目至关重要,有两个主要原因。第一个原因是信任。可重现性为生成模型的系统创造了信任和可信度。对于任何系统,如果无法复现其输出,人们就不会相信该系统。这对于机器学习项目非常重要,因为应用程序将根据模型的输出做出决策,例如,一个聊天机器人将根据用户意图预测将用户的呼叫转接到适当的服务部门。如果我们无法复现一个模型,建立在该模型之上的应用程序就是非确定性的和不可信赖的。
第二个原因是,模型可重现性有助于性能故障排除。在检测到模型性能下降时,人们首先想知道训练数据集和训练算法代码发生了什么变化。如果不支持模型可重现性,性能故障排除将非常困难。
数据集管理的设计原则
在我们开始构建数据集管理(DM)系统之前,我们想要提出五个设计原则。 注意:我们认为这五个原则是本章最重要的要素。对于数据应用程序,我们在设计中遵循的原则比实际设计更重要。因为数据可以是任何形式的任何内容,通常没有数据存储的范式,也没有适用于所有数据处理用例的标准设计。因此,在实践中,我们通过遵循特定的通用原则来构建自己的数据应用程序。因此,这些原则至关重要。 这里的五个原则将为您构建新的DM服务或改进现有的DM服务提供明确的设计目标。
原则1:支持数据集可重现性以复现模型
数据集可重现性意味着DM始终返回与过去相同的训练示例。例如,当训练团队开始训练模型时,DM会提供一个带有版本字符串的数据集。每当训练团队或其他团队需要检索相同的训练数据时,可以使用该版本字符串查询DM以检索相同的训练数据。 我们认为所有的DM系统都应支持数据集的可重现性。更好的是还能提供数据差异功能,这样我们可以轻松查看两个不同数据集版本之间的数据差异。这对于故障排除非常方便。
原则2:提供统一的API接口适用于不同类型的数据集
深度学习的数据集可以是结构化的(文本,如销售记录或用户对话的转录)或非结构化的(图像、语音录音文件)。无论DM系统如何在内部处理和存储这些不同形式的数据,它都应该提供统一的API接口,用于上传和提取不同类型的数据集。API接口还将数据源与数据消费者分离;无论在底层发生什么变化,例如数据解析和内部存储格式的变化,下游消费者都不应受到影响。 因此,我们的用户,包括数据科学家和数据开发人员,只需要学习一个API来处理所有不同类型的数据集。这使得系统简单易用。此外,由于我们只公开一个公共API,代码维护成本将大大降低。
原则3:采用强类型的数据模式
强类型的数据模式是避免由数据变更引起的意外故障的关键。通过数据模式的强制执行,DM服务可以保证其摄取的原始数据和生成的训练数据与我们的规范一致。 强类型的数据模式作为一个安全保障,确保下游模型训练代码不受上游数据收集变更的影响,同时也确保了DM的上下游客户的向后兼容性。如果没有数据模式的保护,数据集的消费者——即下游模型训练代码——很容易受到上游数据变更的破坏。 数据模式也可以进行版本控制,但这会给管理带来另一层复杂性。另一个选择是每个数据集只有一个模式。在引入新的数据变更时,确保模式更新是向后兼容的。如果新的数据需求需要破坏性的变更,应该创建一个新的数据集类型和新的模式,而不是更新现有的模式。
原则4:确保API的一致性并在内部处理扩展
深度学习领域的当前趋势是随着数据集的不断增大,模型架构也越来越庞大。例如,GPT-3(用于语言理解的生成预训练转换器语言模型)使用超过250 TB的文本材料,包含数千亿个词;在特斯拉中,自动驾驶模型消耗了大量的以PB级计量的数据。另一方面,对于一些狭窄领域的简单任务,我们仍然使用较小的数据集(约50 MB),例如客户支持票据分类。数据集管理系统应该在内部处理数据扩展的挑战,并且对于大型和小型数据集,向用户(数据开发人员和数据科学家)暴露的API应该是一致的。
原则5:确保数据持久性
理想情况下,用于深度学习训练的数据集应该以不可变的方式存储,以便进行数据重现和故障排除。数据的删除应该是软删除,只有在某些情况下才会进行硬删除,例如当客户选择退出或取消账户时永久删除客户数据。
数据集的矛盾性特征
在我们对数据集管理的概念讨论中,我们想要澄清数据集的一个模糊方面。我们见过许多设计不良的数据集管理系统在这一点上失败了。
数据集具有一个矛盾的特征:它既是动态的又是静态的。从数据科学家的角度来看,数据集是静态的:它是一组固定的带有注释(也称为标签)的文件。从数据开发人员的角度来看,数据集是动态的:它是一个远程存储中的文件保存目标,我们不断向其中添加数据。
因此,从DM的角度来看,数据集应该是一个逻辑文件组,并满足数据收集和数据训练的需求。为了帮助您对如何适应数据集的动态和静态特性有一个具体的理解,让我们看一下图2.5。
我们可以从两个角度阅读图2.5:数据摄取和数据获取。首先,从数据摄取方面来看,我们可以看到数据收集流水线(图表左侧)不断地注入新的数据,例如文本话语和标签。例如,在时间点T0,数据集中创建了一个示例数据批次(示例批次T0)——时间点T1、T2和T3同样如此;随着时间的推移,我们总共创建了四个数据批次。因此,从数据开发人员的角度来看,这个数据集是可变的,因为流水线不断向其中添加数据。
其次,在训练数据获取方面(图表顶部),我们可以看到在获取训练数据时,DM同时从数据集中读取所有当前的数据。我们可以看到数据以静态的版本化快照的形式返回,其中包含一个版本字符串,用于唯一标识从数据集中选择的实际数据。例如,当我们在时间点T2从数据集中获取训练数据时,数据集包含了三个数据批次(批次T0、批次T1和批次T2)。我们将这三个数据批次打包成一个快照,分配一个版本字符串(“version1”),并将其作为训练数据返回。
从模型训练的角度来看,从DM获取的数据集是数据集的静态快照——一个经过时间过滤和客户逻辑过滤的数据集。静态快照对于模型的可重现性非常重要,因为它代表了训练运行中使用的确切训练文件。当我们需要重新构建模型时,可以使用快照版本字符串来找到过去模型训练中使用的快照。
我们已经全面介绍了理论内容,您应该能够理解数据集管理组件的需求、目标和独特特性。接下来的部分是一个具体的示例,介绍如何设计一个数据集管理服务。
参观一个示例数据集管理服务
在本节中,我们将带您了解一个示例的数据集管理(DM)服务。我们创建了这个示例,以便让您了解如何实现2.1.2节中介绍的原则。我们将首先在本地运行该服务,进行操作,然后查看其API设计和内部实现。
与示例服务进行操作
为了让您更容易上手,我们编写了七个Shell脚本,用于自动化整个DM实验室的过程。这些Shell脚本是在本节中体验演示场景的推荐方式,因为它们不仅自动化了服务的本地设置,还负责设置环境变量、准备示例数据和初始化本地网络。
您可以在github.com/orca3/MiniA…找到这些脚本,以”dm”开头。我们的GitHub仓库中的”function demo”文档(github.com/orca3/MiniA…)提供了完成实验和这些脚本的示例输出的详细说明。
注意:在运行”function demo”之前,应先满足系统要求。请参考github.com/orca3/MiniA…。
该实验室包括三个部分:首先,运行示例数据集管理服务;其次,创建一个数据集并将数据上传到其中;第三,从刚创建的数据集中获取训练数据。
在本地设置服务
示例服务使用Java 11编写。它使用MinIO作为文件blob服务器来模拟云对象存储(如Amazon S3),因此我们可以在本地运行而无需任何远程依赖。如果您已经在附录A中设置了实验室,您可以在终端的scripts文件夹根目录下运行以下命令(清单2.1)启动服务。
注意:强烈建议在运行DM演示脚本之前进行干净的设置。您可以执行./scripts/lab-999-tear-down.sh清理之前的实验。
# (1) Start minio server
./scripts/dm-001-start-minio.sh
# (2) start dataset management service, it will build ➥ the dm image and run the container. ./scripts/dm-002-start-server.sh
创建和更新语言意图数据集
我们的示例DM服务提供了三个API方法,供用户创建/更新数据集并检查结果。这些API方法包括CreateDataset、UpdateDataset和GetDatasetSummary。我们将在接下来的几节中详细讨论它们。
在这个示例场景中,我们首先调用数据管理服务上的CreateDataset API方法来创建一个新的语言意图数据集;然后我们使用UpdateDataset API方法向数据集追加更多数据。最后,我们使用GetDatasetSummary API方法获取数据集的统计信息和提交(数据变更)历史记录。
注意:脚本dm-003-create-dataset.sh和dm-004-add-commits.sh自动化了前面的步骤。请使用它们来运行演示场景。请注意,以下代码列表仅用于说明目的。
现在让我们运行实验。首先,我们将使用以下列表创建一个数据集。
mc -q cp data-management/src/test/resources/datasets/test.csv
➥ myminio/"${MINIO_DM_BUCKET}"/upload/001.csv
grpcurl -v -plaintext \
-d '{"name": "dataset-1", \
"dataset_type": "LANGUAGE_INTENT", \
"bucket": "mini-automl", \
"path": "{DATA_URL_IN_MINIO}"}' \
${DM_SERVER}:${DM_PORT} \
data_management.DataManagementService /CreateDataset
需要注意的是,CreateDataset API要求用户在gRPC请求中提供可下载的URL,而不是实际的数据,这就是为什么我们首先将001.csv文件上传到本地的MinIO服务器中。在数据集创建完成后,CreateDataset API将返回一个包含数据摘要和数据集提交历史记录的JSON对象。以下是一个示例结果:
{
"datasetId": "1",
"name": "dataset-1",
"dataset_type": "TEXT_INTENT",
"last_updated_at": "2021-10-09T23:44:00",
"commits" : [
{
"dataset_id": "1",
"commit_id": "1",
"created_at": "2021-10-09T23:44",
"commit_message" : "Initial commit",
"tags": [
{
"tag_key": "category",
"tag_value": "test set"
} ],
"path": "dataset/1/commit/1",
"statistics":{
"numExamples": "5500",
"numLabels": "151"
}
} ]
}
创建数据集后,您可以通过追加更多数据来不断更新它;以下是数据集更新的gRPC请求示例。
mc -q cp data-management/src/test/resources/datasets/train.csv myminio/"${MINIO_DM_BUCKET}"/upload/002.csv
grpcurl -v -plaintext \
-d '{"dataset_id": "1", \
"commit_message": "More training data", \ "bucket": "mini-automl", \
"path": "upload/002.csv" , \
"tags": [{ \
"tag_key": "category", \
"tag_value": "training set\"}]}' \
${DM_SERVER}:${DM_PORT} \
data_management.DataManagementService/UpdateDataset
一旦数据集更新完成,UpdateDataset API会以与CreateDataset API相同的方式返回一个数据摘要的JSON对象;以下是一个示例的响应对象:
{
{
"datasetId": "1",
"name": "dataset-1",
"dataset_type": "TEXT_INTENT",
"last_updated_at": "2021-10-09T23",
"commits": [
"commit_id": "1",
.. .. ..
}, {
} ],
"path": "dataset/1/commit/2",
"statistics": {
"numExamples": "7600",
"numLabels": "151"
}
} ]
"dataset_id": "1",
"commit_id": "2",
"created_at": "2021-10-09T23:59:17", "commit_message": "More training data", "tags": [
{
"tag_key": "category",
"tag_value": "training set"
} ],
"path": "dataset/1/commit/2",
"statistics": {
"numExamples": "7600",
"numLabels": "151"
}
}
]
}
您还可以使用GetDatasetSummary API获取数据集的数据摘要和提交历史记录。以下是一个示例的gRPC请求:
grpcurl -v -plaintext
-d '{"datasetId": "1"}' \
${DM_SERVER}:${DM_PORT} \ data_management.DataManagementService/GetDatasetSummary
获取训练数据集
现在我们已经创建了一个包含原始数据的数据集(ID = 1),让我们尝试从中构建一个训练数据集。在我们的示例服务中,这是一个两步的过程。
我们首先调用PrepareTrainingDataset API来启动数据集构建过程。然后我们使用FetchTrainingDataset API查询数据集准备进度,直到请求完成。
注意:脚本dm-005-prepare-dataset.sh、dm-006-prepare-partial-dataset.sh和dm-007-fetch-dataset-version.sh自动化了后续的步骤。请尝试使用它们来运行代码清单2.4和2.5中的示例数据集获取演示。
要使用PrepareTrainingDataset API,我们只需要提供一个数据集ID。如果您只想在训练数据集中包含一部分数据,可以在请求中使用标签作为过滤器。以下是一个示例请求。
grpcurl -plaintext \
-d “{“dataset_id”: “1”}” \
${DM_SERVER}:${DM_PORT} \
data_management.DataManagementService/PrepareTrainingDataset
grpcurl -plaintext \
-d “{“dataset_id”: “1”, \
“Tags”:[ \
{“tag_key”:”category”, \
“tag_value”:”training set”}]}” \
${DM_SERVER}:${DM_PORT} data_management.DataManagementService/PrepareTrainingDataset
一旦数据准备的gRPC请求成功,它将返回一个如下的JSON对象:
{
"dataset_id": "1",
"name": "dataset-1",
"dataset_type": "TEXT_INTENT", "last_updated_at": "2021-10-09T23:44:00",
"version_hash": "hashDg==",
"commits": [
{
"commit_id": "1",
.. .. ..
},
{
"commit_id": "2",
.. .. ..
}
]
}
在PrepareTrainingDataset API返回的数据中,包含一个名为”version_hash”的字符串。它用于标识API生成的数据快照。我们可以使用该哈希作为ID,调用FetchTrainingDataset API来跟踪构建训练数据集的进度。以下是一个示例:
-d "{"dataset_id": "1", \
"version_hash": \
"hashDg=="}" \
${DM_SERVER}:${DM_PORT}
data_management.DataManagementService/FetchTrainingDataset
FetchTrainingDatasetc API返回一个描述训练数据集的JSON对象。它告诉我们后台数据集构建过程的状态:RUNNING(运行中)、READY(准备就绪)或FAILED(失败)。如果训练数据准备就绪可以使用,响应对象将显示可下载的训练数据的URL列表。在此演示中,这些URL指向本地的MinIO服务器。以下是一个示例响应:
{
"dataset_id": "1",
"version_hash": "hashDg==",
"state": "READY",
"parts": [
{
"name": "examples.csv",
"bucket": "mini-automl-dm",
"path": "versionedDatasets/1/hashDg==/examples.csv"
},
{
"name": "labels.csv",
"bucket": "mini-automl-dm",
"path": "versionedDatasets/1/hashDg==/labels.csv"
}
],
"statistics": {
"numExamples": "16200",
"numLabels": "151"
}
}
做得好!您刚刚体验了我们示例数据集管理服务提供的所有主要数据API。通过尝试自己上传数据和构建训练数据集,希望您对如何使用这项服务有了一定的了解。在接下来的几节中,我们将介绍用户场景、服务架构概述和示例数据集管理服务的代码实现。
注意:如果您在运行上述脚本时遇到任何问题,请参考我们GitHub仓库中的“function demo”文档中的说明。此外,如果您想尝试第3章和第4章的实验,请保持容器运行,因为它们是模型训练实验的先决条件。
用户、用户场景和整体情况
在设计后端服务时,我们发现一种非常有用的方法是从外部到内部思考。首先,确定用户是谁,服务将提供什么价值,以及客户将如何与服务进行交互。然后,内部逻辑和存储布局应该自然而然地出现在您的脑海中。在介绍这个示例DM服务时,我们将使用相同的方法向您展示。所以让我们先看看我们的用户和用户场景。
注意:我们首先关注使用案例的原因是我们认为任何系统设计应该最关注用户。如果我们确定了客户如何使用系统,我们的效率和可扩展性的方法将自然浮现出来。如果设计按照相反的顺序进行(首先考虑技术,其次考虑可用性),那么系统往往会使用起来很笨拙,因为它是为技术而设计的,而不是为客户而设计的。
用户和用户场景
我们的示例DM服务是为两个虚构的用户而设计的:Jianguo,一名数据工程师,和Julia,一名数据科学家。他们共同合作训练一个语言意图分类模型。Jianguo负责训练数据的收集工作。他不断从不同的数据源收集数据(例如解析用户活动日志和进行客户调查)并对其进行标注。Jianguo使用DM的数据摄取API来创建数据集、向现有数据集添加新数据,并查询数据集的摘要和状态。
Julia使用Jianguo构建的数据集来训练意图分类模型(通常使用PyTorch或Tensorflow编写)。在训练时,Julia的训练代码首先调用DM服务的获取训练数据API从DM中获取训练数据,然后开始训练过程。
服务的整体架构
我们的示例DM服务分为三个层级:数据摄取层、数据集获取层和数据集内部存储层。数据摄取API集用于Jianguo上传新的训练数据和查询数据集状态。数据集获取API用于Julia获取训练数据集。请参考图2.6和图2.7以了解整体情况。
图2.6中的中心大框显示了我们示例数据集管理服务的整体设计。它具有一个内部数据集存储系统和两个面向公众的接口:数据摄取API和数据集获取API,一个用于数据摄取,另一个用于数据集获取。该系统支持强类型模式数据集(文本和图像类型)和非模式数据集(通用类型)。
图2.7展示了示例DM服务用于存储数据集的整体数据结构。提交由数据摄取API创建,版本化快照由数据集获取API创建。引入提交和版本化快照的概念是为了解决数据集的动态和静态特性。我们将在第2.2.5节中详细讨论存储。
在接下来的子章节中,我们将逐个组件地详细介绍前面两个图表的每个细节。我们首先从API开始,然后转向内部存储和数据模式。
数据摄取API
数据摄取API允许在示例数据集管理服务中创建、更新和查询数据集。图2.8中的灰色框显示了数据摄取层中支持将数据摄入DM的四个服务方法的定义。它们的名称很直观,让我们看一下它们在清单2.6中的gRPC方法定义。
注意:为了减少样板代码,我们选择了gRPC来实现示例DM服务的公共接口。这并不意味着gRPC是数据集管理服务的最佳方法,但与RESTful接口相比,gRPC的简洁编码风格非常适合演示我们的思路,而无需让您接触到不必要的Spring Framework细节。
数据摄取方法的定义
让我们来看一下我们示例数据摄取API的定义。
# create a new dataset and save data into it
rpc CreateDataset (CreateDatasetRequest) returns (DatasetSummary);
# add new data to an existing dataset
rpc UpdateDataset (CreateCommitRequest) returns (DatasetSummary);
# get summary and history of a given dataset
rpc GetDatasetSummary (DatasetPointer) returns (DatasetSummary);
# list all existing datasets’ summary
rpc ListDatasets (ListQueryOptions) returns (stream DatasetSummary);
message CreateDatasetRequest {
string name = 1;
string description = 2;
DatasetType dataset_type = 3;
string bucket = 4;
string path = 5;
repeated Tag tags = 6;
}
注意:本示例服务未涉及数据删除和修改的主题,但可以很容易地扩展服务以支持这些功能。
数据URL与数据流式传输
在我们的API设计中,您可能会注意到我们要求用户提供数据URL作为原始数据输入,而不是直接将文件上传到我们的服务中。在第2.2.4节中,我们还选择返回数据URL作为训练数据集,而不是通过流式传输端点直接返回文件。主要原因是我们希望将文件传输工作卸载给云对象存储服务,例如Amazon S3或Azure Blob。这样做有两个好处:首先,它节省了网络带宽,因为客户端和服务之间没有实际的文件传输;其次,它减少了代码复杂性,因为在文件较大且API使用频率较高时,保持数据流式传输的高可用性可能很复杂。
创建新数据集
让我们看一下gRPC CreateDataset方法是如何实现的。在调用DM(createDataset API)创建数据集之前,用户(Jianguo)需要为他们想要上传的数据准备一个可下载的URL(步骤1和步骤2);该URL可以是云对象存储服务(如Amazon S3或Azure Blob)中的可下载链接。在我们的示例服务中,我们使用MinIO服务器在本地运行以模拟Amazon S3。Jianguo还可以在数据集创建请求中指定数据集的名称和标签。清单2.7突出显示了实现图2.9中所示的工作流程的关键代码片段(dataManagement/DataManagementService.java)。
public void createDataset(CreateDatasetRequest request) {
int datasetId = store.datasetIdSeed.incrementAndGet();
Dataset dataset = new Dataset(datasetId, request.getName(),request.getDescription(),request.getDatasetType());
int commitId = dataset.getNextCommitId();
CommitInfo.Builder builder = DatasetIngestion.ingest(minioClient, datasetId, commitId,request.getDatasetType(), request.getBucket(),request.getPath(), config.minioBucketName);
store.datasets.put(Integer.toString(datasetId), dataset);
dataset.commits.put(commitId, builder.setCommitMessage("Initial commit").addAllTags(request.getTagsList()).build());
responseObserver.onNext(dataset.toDatasetSummary());
responseObserver.onCompleted();
}
DatasetIngestion.ingest()方法的实现细节将在第2.2.5节中进行讨论。
更新现有数据集
深度学习模型的开发是一个持续的过程。一旦我们为模型训练项目创建了一个数据集,数据工程师(例如Jianguo)将会持续添加数据。为了满足这个需求,我们提供了UpdateDataset API。
要使用UpdateDataset API,我们需要为新数据准备一个数据URL。我们还可以传递一个提交消息和一些自定义标签来描述数据的更改;这些元数据对于数据历史跟踪和数据筛选非常有用。
数据集更新的工作流程与数据集创建的工作流程几乎相同(图2.9)。它使用给定的数据创建一个新的提交,并将该提交附加到数据集的提交列表中。唯一的区别是数据集更新工作流程不会创建一个新的数据集,而是在现有数据集上进行操作。请参考以下代码清单。
注意:因为每次数据集更新都被保存为一个提交,所以如果有需要,我们可以通过一些数据集管理API轻松地删除或软删除这些提交。由于篇幅限制,这些管理API并未在此处讨论。
创建新数据集的七个步骤的高级概述:(1)将数据上传到云端 Jianguo错误地向一个数据集中上传了一些标注错误的数据。由于空间限制,这些管理API没有被讨论。
public void updateDataset(CreateCommitRequest request) {
String datasetId = request.getDatasetId();
Dataset dataset = store.datasets
.get(datasetId);
String commitId = Integer.toString(dataset
.getNextCommitId());
// the rest code are the same as listing 2.7
.. .. .. }
在第2.2.3节中,我们将更详细地讨论提交的概念。目前,您只需要知道每个数据集更新请求都会创建一个新的提交对象。
注意:为什么要将数据更新保存在提交中?我们能否将新数据与当前数据合并,只存储最新状态?在我们的数据集更新实现中,每次调用UpdateDataset API时,我们都会创建一个新的提交。我们避免原地数据合并的原因有两个:首先,原地数据合并可能导致不可逆的数据修改和悄悄的数据丢失。其次,为了重现过去使用的训练数据集,我们需要确保DM接收到的数据批次被以不可变的方式存储,因为它们是我们随时用于创建训练数据集的源数据。
列出数据集和获取数据集摘要
除了CreateDataset和UpdateDataset API之外,我们的用户还需要方法来列出现有的数据集并查询数据集的概览,例如数据集的示例和标签数量以及审计历史。为了满足这些需求,我们构建了两个API:ListDatasets和GetDatasetSummary。第一个API可以列出所有现有的数据集,而第二个API提供了关于数据集的详细信息,如提交历史、示例和标签计数、数据集ID和类型。这两个API的实现很简单,您可以在我们的Git存储库(miniAutoML/DataManagementService.java)中找到它们。
训练数据集获取API
在本节中,我们将介绍数据集获取层,它在图2.10中被标识为灰色方框。为了构建训练数据,我们设计了两个API。首先,数据科学家(Julia)调用PrepareTrainingDataset API发起训练数据准备请求;我们的DM服务将启动一个后台线程来开始构建训练数据,并返回一个版本字符串作为训练数据的参考句柄。接下来,如果后台线程已完成,Julia可以调用FetchTrainingDataset API来获取训练数据。
数据即获取方法的定义
首先,让我们看一下两个数据集获取方法(PrepareTrainingDataset和FetchTrainingDataset)的gRPC服务方法定义(grpc-contract/src/main/proto/data_management.proto)。
rpc PrepareTrainingDataset (DatasetQuery) returns (DatasetVersionHash);
rpc FetchTrainingDataset (VersionHashQuery) returns (VersionHashDataset);
message DatasetQuery{
string dataset_id = 1;
string commit_id = 2;
repeated Tag tags = 3;
}
message VersionHashQuery {
string dataset_id = 1;
string version_hash = 2;
}
发送准备训练数据集请求
现在让我们来看一下PrepareTrainingDataset API的代码工作流程。图2.11展示了我们示例服务如何处理Julia的准备训练数据集请求。
当DM收到数据集准备请求时(图2.11,步骤1),它执行以下三个操作:
- 尝试在其存储中查找具有给定数据集ID的数据集。
- 应用给定的数据过滤器以从数据集中选择提交记录。
- 创建一个versionedSnapshot对象,用于在其内部存储(versionHashRegistry)中跟踪训练数据。versionedSnapshot对象的ID是从所选提交记录的ID列表生成的哈希字符串。
versionedSnapshot对象是Julia需要的训练数据集;它是从所选提交记录中提取的一组不可变的静态文件。Julia可以使用步骤3返回的哈希字符串(快照ID)查询数据集准备状态,并在训练数据集就绪时获取可下载的数据URL。借助该版本字符串,Julia可以始终从未来的任何时间获取相同的训练数据(versionedSnapshot),这就是支持数据集可复现性的方式。 versionedSnapshot的一个附带好处是它可以在不同的PrepareTrainingDataset API调用之间用作缓存。如果快照ID(一组提交的哈希字符串)已经存在,我们会返回现有的versionedSnapshot,而不需要重新构建相同的数据,这可以节省计算时间和网络带宽。
注意:在我们的设计中,数据过滤发生在提交级别,而不是在单个示例级别;例如,在准备请求中使用过滤器标签”DataType=Training”表示用户只希望从标记为”DataType=Training”的提交中获取数据。
在步骤3之后,DM将生成一个后台线程来构建训练数据集。在后台作业中,DM将从MinIO服务器下载每个数据集提交的文件到本地,将它们聚合并以预定义的格式压缩成一个文件,然后将其上传回MinIO服务器的不同存储桶中(步骤6和7)。接下来,DM将将实际训练数据的数据URL放入versionedSnapshot对象,并将其状态更新为”READY”(步骤8)。现在,Julia可以从返回的versionedSnapshot对象中找到数据URL,并开始下载训练数据。
我们还没有涉及的是数据模式。在数据集管理服务中,我们以两种不同的数据格式保存摄入的数据(提交)和生成的训练数据(versionedSnapshot)。数据合并操作(图2.11,步骤6和7)将原始摄入数据(所选提交记录)聚合并转换为意图分类训练数据模式中的训练数据。我们将在2.2.6节中详细讨论数据模式。清单2.10突出显示了图2.11中实现的代码。
public void prepareTrainingDataset(DatasetQuery request) {
# step 1, receive dataset preparation request
Dataset dataset = store.datasets.get(datasetId);
String commitId;
.. .. ..
# step 2, select data commits by checking tag filter
BitSet pickedCommits = new BitSet();
List<DatasetPart> parts = Lists.newArrayList();
List<CommitInfo> commitInfoList = Lists.newLinkedList();
for (int i = 1; i <= Integer.parseInt(commitId); i++) {
CommitInfo commit = dataset.commits.get(Integer.toString(i));
boolean matched = true;
for (Tag tag : request.getTagsList()) {
matched &= commit.getTagsList().stream().anyMatch(k -> k.equals(tag));
}
if (!matched) {
continue;
}
pickedCommits.set(i);
commitInfoList.add(commit);
.. .. ..
}
# step 3, generate version hash from the selected commits list
String versionHash = String.format("hash%s",Base64.getEncoder().encodeToString(pickedCommits.toByteArray()));
if (!dataset.versionHashRegistry.containsKey(versionHash)) {
dataset.versionHashRegistry.put(versionHash,
VersionedSnapshot.newBuilder()
.setDatasetId(datasetId) .setVersionHash(versionHash) .setState(SnapshotState.RUNNING).build());
# step 5,6,7,8, start a background thread to aggregate data # from commits to the training dataset
threadPool.submit(
new DatasetCompressor(minioClient, store, datasetId, dataset.getDatasetType(), parts, versionHash,
config.minioBucketName));
}
# step 4, return hash version string to customer
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
获取训练数据集
一旦DM服务收到了prepareTrainingDataset API上的训练数据集准备请求,它将生成一个后台作业来构建训练数据,并返回一个version_hash字符串以便进行跟踪。Julia可以使用FetchTrainingDataset API和version_hash字符串来查询数据集构建进度,并最终获取训练数据集。
图2.12展示了DM中如何处理数据集获取请求。 获取训练数据集实质上是查询训练数据准备请求的状态。对于每个数据集,DM服务创建一个versionedSnapshot对象,用于跟踪由prepareTrainingDataset请求生成的每个训练数据集。
当用户发送获取数据集查询时,我们只需使用请求中的哈希字符串在数据集的训练快照(versionHashRegistry)中搜索相应的versionedSnapshot对象,并将其返回给用户(如果存在)。versionedSnapshot对象将通过后台训练数据处理作业(图2.11,步骤5-8)持续更新。当作业完成时,它将将训练数据的URL写入versionedSnapshot对象;因此,用户最终获取到训练数据。请参阅以下清单中的代码实现。
public void fetchTrainingDataset(VersionQuery request) {
String datasetId = request.getDatasetId();
Dataset dataset = store.datasets.get(datasetId);
if (dataset.versionHashRegistry.containsKey(request.getVersionHash())) {
responseObserver.onNext(dataset.versionHashRegistry.get(request.getVersionHash()));
responseObserver.onCompleted();
}
.. .. ..
}
内部数据集存储
示例服务的内部存储简单地是一个内存中的数据集对象列表。之前我们讨论过数据集既可以是动态的也可以是静态的。一方面,数据集是一个逻辑文件组,随着不断从各种来源吸收新数据而动态变化。另一方面,它对于训练是静态且可复现的。
为了展示这个概念,我们设计了每个数据集包含一个提交(commit)列表和一个versionedSnapshot列表。提交代表动态摄入的数据:通过数据摄入调用(CreateDataset或UpdateDataset)添加的数据;提交还具有标签和注释目的的消息。versionedSnapshot代表静态训练数据,它由准备训练数据集请求(PrepareTrainingDataset)从选定的提交列表转换而来。每个快照与一个版本相关联;一旦构建训练数据集,您可以随时使用该版本字符串获取相应的训练数据(FetchTrainingDataset)进行重用。图2.13可视化了数据集的内部存储结构。
注意虽然不同类型的数据集的各个训练示例可以以不同的形式存在,比如图像、音频和文本句子,但数据集的操作(创建、更新和查询数据集摘要)以及其动态/静态特性是相同的。因为我们在所有数据集类型之间设计了一套统一的API集,所以我们可以使用统一的存储结构来存储各种不同类型的数据集。
在我们的存储中,实际的文件(提交数据、快照数据)存储在云对象存储(如Amazon S3)中,我们只在DM系统中保留数据集的元数据(稍后解释)。通过卸载文件存储工作并仅跟踪文件链接,我们可以专注于组织数据集并跟踪其元数据,例如编辑历史、数据统计、训练快照和所有权。
到目前为止,我们已经讨论了数据集存储的概念,但实际的数据集写入和读取是如何工作的?我们如何为不同的数据集类型(如GENERIC和TEXT_INTENT类型)序列化提交和快照?
在存储后端实现中,我们使用了一个简单的继承概念来处理不同数据集类型的文件操作。我们定义了一个DatasetTransformer接口,其中包含以下函数:ingest()函数将输入数据保存到内部存储中作为一个提交,compress()函数将选定的提交中的数据合并成一个版本快照(训练数据)。
具体来说,对于”TEXT_INTENT”类型的数据集,我们有IntentTextTransformer来应用文件转换的强类型文件模式。对于”GENERIC”类型的数据集,我们有GenericTransformer来以原始格式保存数据,不进行任何检查或格式转换。图2.14说明了这些情况。
从图2.14可以看出,数据摄入API(2.2.3节)中的原始意图分类数据由IntentTextTransformer的Ingest()函数保存为一个提交;训练数据集获取API(2.2.4节)生成的意图分类训练数据由IntentTextTransformer的Compress()函数保存为一个版本快照。因为它们是普通的Java代码,我们留给您自己去发现;您可以在我们的Git存储库中找到实现代码(org/orca3/miniAutoML/dataManagement/transformers/IntentTextTransformer.java)。
数据模式
到目前为止,我们已经看到了所有的API、工作流程和内部存储结构。现在让我们考虑一下DM服务中的数据是什么样子的。对于每一种强类型的数据集,比如”TEXT_INTENT”数据集,我们定义了两种数据模式:一种用于数据摄入,一种用于训练数据获取(图2.15)。
图2.15展示了DM服务如何使用两种数据模式来实现其数据契约。步骤1使用数据摄入模式验证原始输入数据;步骤2使用训练数据模式将原始数据转换为训练数据格式;步骤3将转换后的数据保存为提交;步骤4在构建训练数据集时将选定的提交合并为一个版本快照,但仍遵循训练数据模式。
这两种不同的数据模式是DM服务提供给我们的两个不同用户(Jianguo和Julia)的数据契约。无论Jianguo如何收集数据,都需要将其转换为数据摄入模式的格式插入到DM中。另外,由于DM保证输出的训练数据遵循训练数据模式,Julia可以放心使用数据集,而不用担心受到Jianguo进行的数据收集更改的影响。
数据摄入模式
我们已经了解了数据模式的概念,现在让我们来看一下我们为TEXT_INTENT数据集定义的数据摄入模式:
>> TEXT_INTENT dataset ingestion data schema
<text utterance>, <label>,<label>,<label>, ...
为了简化起见,我们的数据摄入模式要求TEXT_INTENT数据集的所有输入数据必须以CSV文件格式提供。第一列是文本对话,其余列是标签。以下是一个示例CSV文件:
“I am still waiting on my credit card”, activate_my_card
➥ ;card_arrival
“I couldn’t purchase gas in Costco”, card_not_working
**训练数据集模式 **
对于TEXT_INTENT训练数据,我们的模式将输出数据定义为包含两个文件的压缩文件:examples.csv和labels.csv。labels.csv定义了标签名称到标签ID的映射,而examples.csv定义了训练文本(对话)到标签ID的映射关系。请参考以下示例:
examples.csv: <text utterance>, <label_id>,<label_id>, ...
“I am still waiting on my credit card”, 0;1
“I couldn’t purchase gas in Costco”, 2
Labels.csv: <label_id>, <label_name>
0, activate_my_card
1, card_arrival
2, card_not_working
在一个数据集中拥有两个强类型的数据模式的好处
在一个数据集中拥有两个强类型的数据模式,并让DM将数据从数据摄入格式转换为训练数据格式,我们可以并行进行数据收集开发和训练代码开发。例如,当Jianguo想要向TEXT_INTENT数据集添加一个新特性——”文本语言”时,他可以与DM服务开发人员合作,更新数据摄入模式以添加一个新的数据字段。 Julia不会受到影响,因为训练数据模式没有发生改变。当Julia有空闲时间来在训练代码中使用新特性时,她可以稍后与我们联系以更新训练数据模式。关键是,Jianguo和Julia不需要同步工作来引入新的数据集增强;他们可以独立地工作。
注意:为了简单和演示目的,我们选择使用CSV文件来存储数据。使用纯粹的CSV文件的问题在于它们缺乏向后兼容支持和数据类型验证支持。在生产环境中,我们建议使用Parquet、Google protobuf或Avro来定义数据模式和存储数据。它们附带了一套用于数据验证、数据序列化和模式向后兼容支持的库。
通用数据集:无模式的数据集
尽管我们在多个地方强调定义强类型数据集模式对于数据集管理服务是基本的,但我们在这里将做一个例外,添加一个自由格式的数据集类型——通用(GENERIC)数据集。与强类型的TEXT_INTENT数据集不同,通用类型的数据集没有数据模式验证。我们的服务将按原样保存任何原始输入数据,并在构建训练数据时,服务只是将所有原始数据以其原始格式打包到一个训练数据集中。
通用数据集类型听起来可能不是一个好主意,因为我们基本上会将从上游数据源接收到的任何数据直接传递给下游的训练应用程序,这很容易破坏训练代码中的数据解析逻辑。这绝对不是一个用于生产的选项,但它为实验项目提供了所需的灵活性。
尽管强类型的数据模式提供了良好的数据类型安全保护,但它也需要维护的代价。当您不得不频繁地在DM服务中进行模式更改以适应新的实验所需的数据格式时,这是相当烦人的。
在深度学习项目开始时,有很多事情是不确定的,比如哪种深度学习算法最有效,我们可以收集什么样的数据,以及应该选择什么样的数据模式。为了在所有这些不确定性中前进,我们需要一种灵活的方式来处理任意数据,以便进行模型训练实验。这就是通用数据集类型设计的目的。
一旦业务价值得到证明并选择了深度学习算法,我们就清楚了训练数据的样式,然后就是在数据集管理服务中定义一个强类型的数据集。在接下来的章节中,我们将讨论如何添加一个新的强类型数据集。
添加新的数据集类型(IMAGE_CLASS)
让我们想象一下,有一天Julia向我们(平台开发人员)提出要将她的实验性图像分类项目提升为正式项目。Julia和她的团队正在使用通用数据集开发图像分类模型,并且由于取得了良好的结果,他们现在希望定义一个强类型的数据集(IMAGE_CLASS)来稳定原始数据收集和训练数据使用的数据模式。这将保护训练代码免受未来数据集更新的影响。
要添加一个新的数据集类型(IMAGE_CLASS),我们可以按照以下三个步骤进行。首先,我们必须定义训练数据的格式。在与Julia讨论后,我们决定由FetchTrainingDataset API生成的训练数据将是一个压缩文件;它将包含以下三个文件:
>> examples.csv: <image filename>,<label id>
“imageA.jpg”, 0
“imageB.jpg”, 1
“imageC.jpg”, 0
>> labels.csv: <label id>,<label name>
0, cat
1, dog
>> examples/ - folder
imageA.jpg
imageB.jpg
imageC.jpg
examples.csv和labels.csv是定义每个训练图像标签的清单文件。实际的图像文件存储在examples文件夹中。
第二步是定义数据摄入格式。我们需要与负责收集图像并为其打标签的数据工程师Jianguo讨论数据摄入模式。我们达成了一致,每个CreateDataset和UpdateDataset请求的有效负载数据也是一个压缩文件;其目录结构如下所示:压缩文件应该是一个只包含子目录的文件夹。根文件夹下的每个子目录代表一个标签;其中的图像属于该标签。子目录应仅包含图像,而不包含任何嵌套目录:
├── cat
│ ├── catA.jpg
│ ├── catB.jpg
│ └── catC.jpg
└── dog
├── dogA.jpg
├── dogB.jpg
└── dogC.jpg
最后一步是代码更改。在了解了两种数据模式之后,我们需要创建一个实现DatasetTransformer接口的ImageClassTransformer类,以构建数据读取和写入的逻辑。
首先,我们实现ImageClassTransformer.ingest()函数。该逻辑需要使用在第二步中定义的输入数据格式来解析数据集创建和更新请求中的输入数据,然后将输入数据转换为训练数据格式并将其保存为数据集的提交。
接下来,我们实现ImageClassTransformer.compress()函数,该函数首先通过匹配数据筛选器来选择提交,然后将匹配的提交合并为单个训练快照。最后一步是将ImageClassTransformer.ingest()函数注册到DatasetIngestion.ingestion()函数中,类型为IMAGE_CLASS,并将ImageClassTransformer.compress()函数注册到DatasetCompressor.run()中,类型为IMAGE_CLASS。
正如您所见,通过适当的数据集结构,我们可以通过添加一些新的代码片段来支持新的数据集类型。现有的数据集类型和公共数据摄入和获取API不受影响。
服务设计回顾
让我们回顾一下这个示例数据集管理服务如何解决第2.1.2节中介绍的五个设计原则:
-
原则1:支持数据集可复现性。我们的示例DM服务将所有生成的训练数据保存为带有版本哈希字符串的版本快照,并将该版本字符串作为键。用户可以使用该版本字符串随时获取训练数据快照。
-
原则2:为不同的数据集类型提供统一的体验。数据摄入API和训练数据获取API对所有数据集类型和大小都采用相同的方式工作。
-
原则3:采用强类型数据模式。我们的示例数据集类型TEXT_INTENT和IMAGE_CLASS都对原始摄入数据和训练数据应用了自定义的数据模式。
-
原则4:确保API的一致性并在内部处理扩展。尽管我们的示例代码将所有数据集的元数据保存在内存中(为了简单起见),我们可以很容易地将数据集存储结构实现在云对象存储中;从理论上讲,它具有无限的容量。此外,我们要求使用数据URL发送数据和返回数据,因此无论数据集有多大,我们的API保持一致。
-
原则5:保证数据持久性。每个数据集创建请求和更新请求都会创建一个新的提交;每个训练数据准备请求都会创建一个版本化的快照。提交和快照都是不可变的,并且没有数据过期限制。
注意:为了保持简单,我们从示例数据集管理服务中删除了许多重要功能。例如,管理API允许您删除数据、恢复数据提交和查看数据审计历史记录。您可以随意分叉存储库并尝试实现它们。
开源的方式
如果您有兴趣采用开源方法来建立数据集管理功能,我们为您选择了两种方法:Delta Lake和Pachyderm。让我们分别看看它们。
Delta Lake与Apache Spark家族中的Petastorm
在这种方法中,我们建议将数据保存在Delta Lake表中,并使用Petastorm库将表数据转换为PyTorch和TensorFlow的数据集对象。这样,数据集可以在训练代码中无缝使用。
Delta Lake
Delta Lake是一种存储层,为Apache Spark和其他云对象存储(如Amazon S3)提供可扩展的、ACID(原子性、一致性、隔离性、持久性)事务支持。Delta Lake由Databricks开发为开源项目,Databricks是一家备受尊敬的数据和人工智能公司。
云存储服务(如Amazon S3)是IT行业中最具可扩展性和成本效益的存储系统之一。它们是构建大型数据仓库的理想位置,但其键值存储设计使得实现ACID事务和高性能变得困难。元数据操作(例如列举对象)是昂贵的,并且一致性保证有限。
Delta Lake旨在填补先前讨论的这些差距。它作为一个文件系统,将批处理和流式数据存储在对象存储中(如Amazon S3)。此外,Delta Lake管理其表结构和模式强制执行的元数据、缓存和索引。它提供ACID属性、时间旅行功能以及大型表格数据集的显著更快的元数据操作。请参见图2.16,了解Delta Lake的概念图。
Delta Lake表是系统的核心概念。在使用Delta Lake时,通常会处理Delta Lake表。它们类似于SQL表,可以查询、插入、更新和合并表内容。Delta Lake的模式保护是其优点之一。它支持在写入表时进行模式验证,以防止数据污染。它还跟踪表的历史,因此您可以将表回滚到其过去的任何阶段(称为时间旅行)。
对于构建数据处理流水线,Delta Lake建议按照铜、银、金三个类别为表命名。首先,我们使用铜表保存来自不同来源的原始输入数据(其中一些可能不太干净)。然后,数据不断从铜表流向银表,经过数据清洗和转换(ETL)处理。
最后,我们对数据进行过滤和净化,并将结果保存到金表中。每个表处于机器学习状态;它们具有可复现性和类型安全性。
为什么Delta Lake是管理深度学习项目数据集的良好选择
以下是使Delta Lake成为管理深度学习项目数据集的良好选择的三个特点。
首先,Delta Lake支持数据集的可复现性。它具有“时间旅行”功能,可以使用数据版本控制查询数据在某个特定时间点的状态。假设您已经设置了一个持续运行的ETL流水线,以使训练数据集(金表)保持最新。因为Delta Lake将表的更新跟踪为快照,每个操作在流水线写入数据集时自动生成版本。这意味着所有的训练数据快照都可以免费保存,并且您可以浏览表的更新历史记录并轻松地回滚到过去的阶段。以下代码提供了一些示例命令。
pathToTable = "/my/sample/text/intent/dataset/A"
deltaTable = DeltaTable.forPath(spark, pathToTable)
fullHistoryDF = deltaTable.history()
lastOperationDF = deltaTable.history(1)
df = spark.read.format("delta")
.option("timestampAsOf", "2021-07-01")
.load(pathToTable)
df = spark.read.format("delta")
.option("versionAsOf", "12")
.load(pathToTable)
其次,Delta Lake支持持续的流式数据处理。它的表可以无缝处理来自历史和实时流式数据源的连续数据流。例如,您的数据流水线或流数据源可以在向Delta Lake表中添加数据的同时从表中查询数据。这样可以节省您在编写代码时将新数据与现有数据合并的额外步骤。
第三,Delta Lake提供模式强制和演化。它在写入时应用模式验证。它将确保新数据记录与表的预定义模式匹配;如果新数据与表的模式不兼容,Delta Lake将引发异常。在写入时进行数据类型验证比在读取时进行更好,因为如果数据被污染,清洗数据就变得困难。
除了强大的模式强制执行外,Delta Lake还允许您向现有数据表添加新列,而不会引起破坏性的更改。数据集模式强制执行和调整(演化)的能力对于深度学习项目至关重要。这些能力保护训练数据不受意外数据写入的污染,并提供安全的数据更新。
PETASTORM
Petastorm是由Uber ATG(高级技术组)开发的开源数据访问库。它可以直接从Apache Parquet格式的数据集中进行单机或分布式深度学习模型的训练和评估(Parquet是一种为高效数据存储和检索而设计的数据文件格式)。
Petastorm可以轻松地将Delta Lake表转换为TensorFlow和PyTorch格式的数据集,并且还支持分布式训练数据分区。借助Petastorm,可以简单地使用Delta Lake表中的训练数据进行下游训练应用,无需担心特定训练框架的数据转换细节。它还在数据集格式和训练框架(TensorFlow、PyTorch和PySpark)之间创建了良好的隔离。图2.17可视化了数据转换过程。
图2.17描述了Petastorm数据转换的工作流程。您可以创建一个Petastorm Spark转换器,将Delta Lake表作为Parquet文件读入其缓存,并生成TensorFlow或PyTorch数据集。
示例:为花卉图像分类准备训练数据 现在我们对Delta Lake和Petastorm有了一个大致的了解,让我们看一个具体的模型训练示例。下面的代码片段(代码清单2.13和2.14)展示了一个端到端的图像分类模型训练工作流程,分为两个步骤。首先,它们定义了一个图像处理ETL流水线,将一组图像文件解析为Delta Lake表作为图像数据集。其次,它们使用Petastorm将Delta Lake表转换为可以直接加载到PyTorch框架中进行模型训练的数据集。
让我们首先查看代码清单2.13中的四步ETL数据处理流水线。您还可以在 mng.bz/JVPz 找到完整的代码。
在流水线的开始步骤中,我们将图像从文件夹flower_photos加载到Spark作为二进制文件。其次,我们定义了提取函数,以获取每个图像文件的元数据,例如标签名称、文件大小和图像大小。第三,我们使用提取函数构建数据处理流水线,然后将图像文件传递给流水线,它将生成一个数据帧。数据帧的每一行表示一个图像文件及其元数据,包括文件内容、标签名称、图像大小和文件路径。在最后一步,我们将这个数据帧保存为Delta Lake表gold_table_training_dataset。您还可以在以下代码清单的末尾看到该Delta Lake表的数据模式。
## Step 1: load all raw images files
path_labeled_rawdata = “datacollablab/flower_photos/”
images = spark.read.format("binary").option("recursiveFileLookup", "true") .option("pathGlobFilter", "*.jpg") .load(path_labeled_rawdata) .repartition(4)
## Step 2: define ETL extract functions
def extract_label(path_col):
"""Extract label from file path using built-in SQL functions."""
return regexp_extract(path_col, "flower_photos/([^/]+)", 1)
def extract_size(content):
"""Extract image size from its raw content."""
image = Image.open(io.BytesIO(content))
return image.size
@pandas_udf("width: int, height: int")
def extract_size_udf(content_series):
sizes = content_series.apply(extract_size)
return pd.DataFrame(list(sizes))
## Step 3: construct and execute ETL to generate a data frame
## contains label, image, image size and path for each image.
df = images.select(
col("path"), extract_size_udf(col("content")).alias("size"), extract_label(col("path")).alias("label"), col("content"))
## Step 4: save the image dataframe produced
# by ETL to a Delta Lake table
gold_table_training_dataset = “datacollablab.flower_train_binary” spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed") df.write.format(“delta”).mode(“overwrite”).saveAsTable(gold_table_training_dataset)
>>>
ColumnName: path: string
ColumnName: label: string
ColumnName: labelIndex: bigint
ColumnName: size: struct<width:int, length:int>
ColumnName: content: binary
注意:演示中使用的原始数据是来自TensorFlow团队的花朵数据集。它包含在五个子目录下存储的花朵照片,每个类别一个子目录。子目录的名称是其中包含的图像的标签名称。
现在我们已经在Delta Lake表中构建了一个图像数据集,我们可以借助Petastorm的帮助,使用这个数据集来训练一个PyTorch模型。在代码清单2.14中,我们首先读取由代码清单2.13中定义的ETL流水线生成的Delta Lake表gold_table_training_dataset,然后将数据拆分为两个数据帧:一个用于训练,一个用于验证。接下来,我们将这两个数据帧加载到两个Petastorm Spark转换器中;数据将在转换器内部转换为Parquet文件。最后,我们使用Petastorm API的make_torch_dataloader函数在PyTorch中读取训练样本进行模型训练。请参考以下代码以了解整个三步过程。您还可以在以下链接找到完整的示例代码:mng.bz/wy4B。
## Step 1: Read dataframe from Delta Lake table
df = spark.read.format("delta")
.load(gold_table_training_dataset)
.select(col("content"), col("label_index"))
.limit(100)
num_classes = df.select("label_index").distinct().count()
df_train, df_val = df
.randomSplit([0.9, 0.1], seed=12345)
## (2) Load dataframes into Petastorm converter
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)
# (3) Read training data in PyTorch by using
## Petastorm converter
def train_and_evaluate(lr=0.001):
device = torch.device("cuda")
model = get_model(lr=lr)
.. .. ..
with converter_train.make_torch_dataloader( transform_spec=get_transform_spec(is_train=True), batch_size=BATCH_SIZE) as train_dataloader,
converter_val.make_torch_dataloader( transform_spec=get_transform_spec(is_train=False), batch_size=BATCH_SIZE) as val_dataloader:
train_dataloader_iter = iter(train_dataloader) steps_per_epoch = len(converter_train) // BATCH_SIZE
val_dataloader_iter = iter(val_dataloader)
validation_steps = max(1, len(converter_val) // BATCH_SIZE)
for epoch in range(NUM_EPOCHS):
.. ..
train_loss, train_acc = train_one_epoch( model, criterion, optimizer, exp_lr_scheduler, train_dataloader_iter,steps_per_epoch, epoch,device)
val_loss, val_acc = evaluate( model, criterion, val_dataloader_iter,validation_steps, device)
return val_loss
何时使用Delta Lake
使用Delta Lake的最佳时机是在训练迭代中使用训练数据。
关于Delta Lake的一个常见误解是它只能处理结构化的文本数据,比如销售记录和用户配置文件。但前面的示例展示了它也可以处理非结构化数据,如图像和音频文件;您可以将文件内容作为字节列与其他文件属性一起写入表中,并从中构建数据集。
如果您已经使用Apache Spark构建数据流水线,Delta Lake是进行数据集管理的绝佳选择;它支持结构化和非结构化数据。而且,Delta Lake将数据保存在云对象存储中(如Amazon S3、Azure Blob),它的数据模式强制执行和实时数据更新表支持机制简化了ETL流水线的开发和维护,具有成本效益。最后但并非最不重要的是,时间回溯功能会自动跟踪所有表更新,因此您可以放心进行数据更改并回滚到先前版本的训练数据集。
Delta Lake的限制
使用Delta Lake的最大风险是技术锁定和其陡峭的学习曲线。Delta Lake将表存储在其自己的机制中:基于Parquet的存储、事务日志和索引的组合,这意味着只能由Delta集群进行读写。您需要使用Delta ACID API进行数据摄入和Delta JDBC来运行查询;因此,如果将来决定放弃使用Delta Lake,数据迁移成本将很高。此外,由于Delta Lake与Spark一起使用,如果您对Spark不熟悉,那么您还需要学习很多内容。
关于数据摄入性能,Delta Lake将数据存储到底层的云对象存储中,在使用对象存储操作(如表创建和保存)时,很难实现低延迟的流式处理(毫秒级)。此外,Delta Lake需要为每个ACID事务更新索引;与某些只进行追加式数据写入的ETL相比,它引入了延迟。但在我们看来,对于深度学习项目来说,以秒级为单位的数据摄入延迟并不是问题。如果您对Spark不熟悉,并且不想进行Spark和Delta Lake集群的繁重设置工作,我们还有另一种轻量级的方法供您选择——Pachyderm。
使用云对象存储的Pachyderm
在这一节中,我们想介绍一种轻量级的基于Kubernetes的工具——Pachyderm,用于处理数据集管理。我们将向您展示如何使用Pachyderm来处理图像数据处理和标注的两个示例。但在此之前,让我们先了解一下Pachyderm是什么。
Pachyderm
Pachyderm是一个用于构建面向数据科学的版本控制、自动化、端到端数据管道的工具。它在Kubernetes上运行,并由您选择的对象存储(例如Amazon S3)提供支持。您可以编写自己的Docker镜像用于数据爬取、导入、清理、处理和整理,并使用Pachyderm管道将它们连接在一起。一旦定义了管道,Pachyderm将负责管道的调度、执行和扩展。
Pachyderm提供了数据集版本控制和来源管理(数据血统)。它将每个数据更新(创建、写入、删除等)视为一次提交,并跟踪生成数据更新的数据源。因此,您不仅可以查看数据集的更改历史,还可以将数据集回滚到过去的版本并找到更改的数据来源。图2.18以高层次的视角展示了Pachyderm的工作方式。
在Pachyderm中,数据以Git的风格进行版本控制。每个数据集在Pachyderm中都是一个存储库(repo),它是最高级别的数据对象。存储库包含提交、文件和分支。Pachyderm仅在内部保留元数据(例如审计历史和分支),并将实际文件存储在云对象存储中。
Pachyderm管道执行各种数据转换操作。管道执行用户定义的代码片段(例如,一个Docker容器)来执行操作并处理数据。每个执行称为一个作业。代码清单2.15展示了一个简单的管道定义。这个名为”edges”的管道监视一个名为”images”的数据集。当有新的图像添加到”images”数据集时,管道将启动一个作业,运行”pachyderm/opencv” Docker镜像来解析图像并将其边缘图保存到”edges”数据集中。
{
"pipeline": {
"name": "edges"
},
"description": "A pipeline that performs image \
edge detection by using the OpenCV library.",
"transform": {
"cmd": [ "python3", "/edges.py" ],
"image": "pachyderm/opencv"
},
"input": {
"pfs": {
"repo": "images",
"glob": "/*"
}
}
}
版本和数据溯源
在Pachyderm中,对数据集和管道的任何更改都会自动进行版本控制,您可以使用Pachyderm命令工具pachctl连接到Pachyderm工作区,查看文件历史记录甚至回滚这些更改。以下是使用pachctl命令检查edges数据集的变更历史记录和变更溯源的示例。首先,我们运行pachctl list命令列出edges数据集中的所有提交。在我们的示例中,对edges数据集应用了三个更改(提交):
$ pachctl list commit edges #A
REPO BRANCH COMMIT FINISHED
edges master 0547b62a0b6643adb370e80dc5edde9f 3 minutes ago
edges master eb58294a976347abaf06e35fe3b0da5b 3 minutes ago
edges master f06bc52089714f7aa5421f8877f93b9c 7 minutes ago
要获取数据更改的溯源信息,我们可以使用pachctl inspect命令检查提交。例如,可以使用以下命令检查提交的数据来源。
“eb58294a976347abaf06e35fe3b0da5b”.
$ pachctl inspect commit edges@**eb58294a976347abaf06e35fe3b0da5b \** --full-timestamps
从以下响应中,我们可以看到edges数据集的提交eb58294a976347abaf06e35fe3b0da5b是从images数据集的提交66f4ff89a017412090dc4a542d9b1142计算而来的:
Commit: edges@eb58294a976347abaf06e35fe3b0da5b
Original Branch: master
Parent: f06bc52089714f7aa5421f8877f93b9c
Started: 2021-07-19T05:04:23.652132677Z
Finished: 2021-07-19T05:04:26.867797209Z
Size: 59.38KiB
Provenance: __spec__@91da2f82607b4c40911d48be99fd3031 (edges)
images@66f4ff89a017412090dc4a542d9b1142 (master)
数据溯源功能对于可重现性和故障排除数据集非常有用,因为您始终可以找到过去使用的确切数据,以及创建该数据的数据处理代码。
示例:使用Pachyderm进行图像数据集的标注和训练
在了解了Pachyderm的工作原理之后,让我们看一个使用Pachyderm构建自动化目标检测训练流程的设计建议。对于目标检测模型训练,我们首先需要通过在每个图像上标记目标对象的边界框来准备训练数据集,然后将数据集(包括边界框标签文件和图像)发送给训练代码,以开始模型训练。图2.19展示了使用Pachyderm自动化此工作流程的过程。
在这个设计中,我们使用两个流水线(标注流水线和训练流水线)和两个数据集来构建这个训练流程。在第一步中,我们将图像文件上传到“原始图像数据集”。在第二步中,我们启动标注流水线,运行一个标注应用程序,让用户在图像上绘制边界框来标注目标对象;这些图像是从原始图像数据集中读取的。一旦用户完成标注工作,图像和生成的标签数据将保存到“标注数据集”中。在第三步中,我们向标注数据集添加新的训练数据,这将触发训练流水线启动训练容器并开始模型训练。在第四步中,我们保存模型文件。
除了自动化之外,Pachyderm还会自动对数据进行版本控制,包括原始图像数据集、标注数据集和模型文件。另外,通过利用数据溯源功能,我们可以确定任何给定模型文件使用了哪个版本的标注数据集进行训练,并且这个训练数据是由哪个版本的原始图像数据集生成的。
Pachyderm适用场景
Pachyderm是一个轻量级的方法,可以帮助您轻松构建数据工程流水线,并提供Git风格的数据版本控制支持。它以数据科学家为中心,易于使用。Pachyderm基于Kubernetes,并使用云对象存储作为数据存储,因此对于小型团队来说,它具有成本效益、设置简单且易于维护。我们建议那些拥有自己基础设施的数据科学团队使用Pachyderm,而不是使用Spark。Pachyderm在处理图像和音频等非结构化数据方面表现出色。
Pachyderm的局限性
Pachyderm的不足之处在于缺乏模式保护和数据分析效率。Pachyderm将一切视为文件;它为每个文件版本保留快照,但对文件内容并不关心。在数据写入或读取时没有数据类型验证;它完全依赖于流水线来保护数据的一致性。
缺乏模式意识和保护会为任何持续运行的深度学习训练流水线带来很大风险,因为上游数据处理代码的任何更改都可能破坏下游数据处理或训练代码。此外,如果不了解数据的模式,很难实现数据集比较。
总结
- 数据集管理的主要目标是从各种数据源持续获取新鲜数据,并在支持训练再现性(数据版本跟踪)的同时向模型训练提供数据集。
- 数据集管理组件可以通过并行化模型算法开发和数据工程开发来加快深度学习项目的开发速度。
- 设计数据集管理服务的原则包括:支持数据集的再现性;采用强类型数据模式;设计统一的API,并确保在不同的数据集类型和大小上保持API行为的一致性;保证数据的持久性。
- 数据集管理系统至少应支持(训练)数据集的版本控制,这对于模型的再现性和性能故障排除非常重要。
- 数据集从模型训练的角度来看是静态的,从数据收集的角度来看是动态的。
- 示例数据集管理服务由三个层组成:数据摄取层、内部数据集存储层和训练数据集获取层。
- 在示例数据集管理服务中,我们为每种数据集类型定义了两个数据模式,一个用于数据摄取,一个用于数据集获取。每次数据更新都以提交的形式存储,每个训练数据集都以版本化的快照形式存储。用户可以使用版本哈希字符串随时获取相关的训练数据(数据集的再现性)。
- 示例数据集管理服务支持特殊的数据集类型 – GENERIC数据集。GENERIC数据集没有模式和数据验证,用户可以自由上传和下载数据,因此非常适合原型算法的开发。一旦训练代码和数据集要求成熟,可以将数据集格式提升为强类型数据集。
- Delta Lake和Petastorm可以共同设置用于基于Spark的深度学习项目的数据集管理服务。
- Pachyderm是一个轻量级的基于Kubernetes的数据平台,采用Git风格的数据版本控制,并允许轻松设置流水线。流水线由Docker容器组成,可用于自动化深度学习项目的数据处理流程和训练流程。