DTable – 新型分布式表格实现的早期性能评估

2021年12月8日 | Krystian Guliński

在 Julia 社区最近进行的一项调查中,处理大于可用 RAM 的表格数据的功能位居所有其他优先事项之首。虽然 Julia 已经有一些用于所谓的“内存外处理”的工具,但它们在社区中并不流行,而且大多未得到维护(例如 JuliaDB)。

DTable 计划通过利用当前的 Julia 数据生态系统以及我们现有的分布式计算和内存管理功能,以一种可组合的方式解决这一流行的用例。我们希望它能成为迈向原生 Julia 工具的重要一步,该工具将满足 Julia 社区对内存外表格数据处理的需求!

  1. 什么是 DTable
  2. 为什么选择 DTable?
    1. 目前可用的操作
  3. 初始性能比较(多线程)
    1. 基准测试配置
  4. 基本操作(mapfilterreduce
    1. Map(单列增量)
    2. Filter
    3. Reduce(单列)
    4. Reduce(所有列)
  5. 分组操作
    1. Groupby(洗牌)
    2. 分组缩减(单列)
    3. 分组缩减(所有列)
  6. 实现细节(供感兴趣的用户参考)
  7. 一些注意事项
  8. 结论

什么是 DTable

DTable 是一种表格结构,它提供数据的分布式分区以及在任何支持的环境中对其执行的操作的并行化。它构建在 Dagger.jl 之上,这使其能够在任何工作程序和线程设置中工作,让 Dagger 负责任务调度和内存管理。任何与 Tables.jl 兼容的源都可以被 DTable 摄取,并且 DTable 也可以用作接收器,以防您将数据移动到其他地方(例如 CSV)。

一个关键特性是 DTable 不使用任何专用的结构来存储内存中的表格数据。任何与 Tables.jl 兼容的表格类型都可以用于内部存储,这使得与生态系统具有更大的可组合性,并提供了所选表格类型的所有优势。为了进一步支持这一目标,可以在 DTable 上执行的操作集是通用的,并且仅依赖于 Tables.jl 提供的接口。

下图简单地说明了 DTableGDTable(分组 DTable)的构建方式。提供的表格输入将根据 chunksize 参数或现有分区(使用 Tables.partitions 接口)进行分区。执行 groupby 操作后,数据将相应地进行洗牌,并且将创建新的“块”(Dagger 分区),其中仅包含属于特定键的数据。这些块连同 index 形成一个 GDTable

为什么选择 DTable?

DTable 旨在在两个方面表现出色

目标是与类似的工具(如 DaskSpark)竞争,以便 Julia 用户可以在 Julia 中解决和扩展他们的问题。

通过利用 Julia 数据生态系统的可组合性,我们可以重用许多现有的功能来实现上述目标,并在将来继续改进解决方案,而不是仅仅创建另一个单一的解决方案。

目前可用的操作

以下是目前通常可用的功能列表。要发布有关新操作的建议,请在此 GitHub issue 中发表评论。将来,我们希望提供路线图和计划功能的优先级指标。

初始性能比较(多线程)

下面的基准测试展示了 DTableDataFrames.jl(目前 Julia 中的首选数据处理包)以及 DaskDaggerDTable 的主要竞争对手)的初始性能评估。DataFrames.jl 基准测试旨在提供对 Julia 中当前性能的参考。

请注意,下面的基准测试是专门针对比较相同类型的处理活动而准备的。这意味着基准测试代码已相应调整,以确保这些包在底层执行相同的操作集。

下表显示了在一台机器的多线程环境中获得的结果汇总(下一节中提供了确切的设置)。比较每个基准测试的每个配置中的时间,并在表中进行汇总。负值表示相对于竞争对手的减速。

操作比 Dask 快多少倍比 DataFrames.jl 快多少倍
Map4.94.92.5-2.5
Filter1.6-1.614.5-14.5
Reduce(单列)31.131.12.92.9
Reduce(所有列)27.127.13.73.7
Groupby(洗牌)17.517.5974.5-974.5
按组缩减(单列)20.220.2343.0-343.0
按组缩减(所有列)22.322.3166.1-166.1

基准测试配置

基准测试代码和原始结果可以在 此存储库 中找到。

所有基准测试运行都在具有以下规格的台式机上执行

所有配置都使用一个具有 1 个工作程序和 16 个线程的环境运行。

实验使用的数据如下准备

下图总结了上述规范

基本操作(mapfilterreduce

这三种操作是任何表格结构的大部分功能的基础。通过查看它们的性能,我们可以很好地了解表格在许多常见的数据转换场景中的表现。

这些基本操作不受唯一值数量的影响,因此此处未包含这些比较的结果。

Map(单列增量)

在第一个基准测试中,我们对整个表格执行简单的 map 操作。

乍一看很明显,DTableDask 中存在的来自分区和并行化的开销在这个基准测试中并没有得到回报。DataFrames.jl 包在此处处于领先地位,DTable 平均慢 2.5 倍。

在较小的 chunksize(10^6)下,DTable 的扩展性优于其竞争对手,而竞争对手不受此参数的影响很大。总体而言,DTable 在所有测试配置中与 Dask 相比实现了平均约 4.9 倍的加速。

DTable 命令:map(row -> (r = row.a1 + 1,), d)

Filter

由于值的集合是有限的,因此选择了一个简单的过滤器表达式,它过滤掉大约一半的记录(下面的命令)。

在这种情况下,并行化和分区带来的开销并没有带来很好的收益,因为DTableDask都明显比DataFrames.jl慢。在比较这两种实现时,性能看起来非常相似,Dask平均比DTable快1.6倍。

DTable的可扩展性使其能够在最大数据集上追赶上DataFrames。这种行为可能在更大的数据集上继续存在,并且最终在某个阈值之后相对于DataFrames提供加速。

DTable命令:filter(row -> row.a1 < unique_values ÷ 2, d)

Reduce(单列)

Reduce基准测试是DTable真正闪光的地方。这项任务可以轻松利用数据的分区来实现速度提升。

DTable不仅成功地实现了比DataFrames.jl更快的速度(平均快约2.9倍),而且还通过提供约31.1倍的速度提升显著击败了Dask的性能。

请注意,DTableDataFrames.jl都使用OnlineStats.jl来获取方差,而Dask使用其自身的原生实现。本文中所有reduce基准测试都侧重于测试经典reduce函数(如(acc, x) -> acc + x)的性能。

为了计算常用统计量,DataFrames.jl用户应该使用Statistics.jl中提供的数组函数(例如meanvar)。它们提供了更好的性能,但这里没有使用,因为它们不是经典的reduce函数。

DTable命令:reduce(fit!, d, cols=[:a1], init=Variance())

Reduce(所有列)

与之前的基准测试类似,DTable在这里表现非常出色,相对于DataFrames.jl提供了约3.7倍的速度提升,相对于Dask提供了约27.1倍的速度提升。

将来可以为宽表启用额外的并行化。目前,DTable将所有列的reduce作为单个任务执行。

DTable命令:reduce(fit!, d, init=Variance())

分组操作

表洗牌绝对是可以在表上执行的最耗时的操作之一,因此我们尽早解决了它,以评估当前的技术栈是否可以执行此类操作。

在以下基准测试中,groupby(洗牌)和分组reduce的性能将受到考验。GDTable(分组DTable)还提供其他操作,如mapfilter,但它们的工作方式与在DTable上执行时相同,因此之前显示的基准测试仍然适用。

以下基准测试包括在测试中获得的结果,这些测试具有不同的unique_values计数,因为它们的数目直接影响通过分组操作生成的组数。

请注意,测试场景已专门调整,以确保基准测试测量相同类型的活动(数据洗牌)。最值得注意的是,Dask基准测试显式地使用shuffle而不是groupby来避免优化的groupby/reduce例程,这些例程不执行数据移动。一旦DTable也支持这些优化,将来可以进行更好的比较。

Groupby(洗牌)

在本实验中,我们研究了各种数据配置中的洗牌性能。DataFrames.jl在groupby上不执行数据移动,因此其性能明显优于其他两种技术,仅用于参考目的。

让我们关注DaskDTable,它们在洗牌过程中执行数据移动。在不同的数据配置中,我们可以看到一个共同的模式,即DTable在较小的数据规模上明显快于Dask,这导致它提供了平均约17.5倍的速度提升,但随着数据规模的增长,Dask的可扩展性更好,最终与DTable的速度相匹配。

但是,在更苛刻的配置中(其中unique_values计数等于10410^4),Dask反复无法完成超过一定数据大小的洗牌(nn > 10810^8)。因此,以下基准测试将不包括这些失败测试的结果。这些配置也从平均性能比较中排除。

DTable成功完成了这些复杂的场景,并且没有观察到对可扩展性的任何影响,这是一个好兆头,但需要在更大的数据集上进行未来的测试以更深入地了解当前的洗牌算法的性能。

DTable命令:Dagger.groupby(d, :a1)

分组缩减(单列)

效仿reduce基准测试的成功,DTable在这里再次比直接竞争对手表现得更好。对于单列reduce,它比Dask平均快约20.2倍,并且它们的可扩展性行为看起来非常相似。

与标准的reduce基准测试相反,DTable在所有数据规模上都没有提供比DataFrames.jl更快的速度。看起来当前的算法存在明显的开销,可以将其视为较小数据规模下性能的下限。对于unique_values计数较小的基准测试,DTable在更大的数据规模上能够赶上DataFrames.jl。这可能表明,通过进一步增加数据规模,我们最终可能会达到DTable在这种情况下提供比DataFrames.jl更好的性能的点。

DTable命令:r = reduce(fit!, g, cols=[:a2], init=Mean())

分组缩减(所有列)

所有列reduce的结果看起来与单列非常相似。DTable成功地提供了比Dask平均快约22.3倍的速度。

同样,由于显著的入口开销在较小数据规模下充当性能下限,DTable在较小数据规模上严重落后于DataFrames.jl

DTable命令:r = reduce(fit!, g, init=Mean())

实现细节(供感兴趣的用户参考)

DTable构建在DaggerTables.jl之上,目前位于Dagger.jl包中。这意味着它可以在Dagger能够运行的任何环境中运行。您应该能够在本地机器上的线程环境中有效地使用DTable,在具有许多工作线程的更大机器上使用,或者将工作负载分散到集群中的多台机器和工作线程上。

DTable使用Dagger新的“急切API”,这意味着所有并行化的调用都使用Dagger.spawnDagger.@spawn执行。内存由Dagger通过使用MemPool.jl进行管理。将来对相关项目的升级将有望为DTable带来性能和功能改进。

由于DTable对其他项目的依赖,它完全专注于提供与Tables.jl兼容的算法和接口,以满足处理大型表格数据日益增长的需求。

我们希望Tables.jl接口将扩展到包含更广泛的功能,同时仍然提供与其他Julia包的良好互操作性。

有关更多详细信息,请访问Dagger文档

一些注意事项

有一些尚未合并到Julia中的待处理PR改进了Distributed的线程安全,这直接影响了Dagger.jl的稳定性。当在线程或混合环境中广泛使用DTable时,用户体验可能会偶尔因偶尔挂起或崩溃而中断。

我们希望在未来的Julia 1.7补丁中包含所有必要的修复。

结论

DTable已成功通过概念验证阶段,目前作为Dagger.jl包的一部分正在积极开发中。

此次早期性能评估证实,DTable有潜力成为处理表格数据的竞争工具。它在7个基准测试中的6个中实现了比直接竞争对手(Dask)显著更好的性能,在其余一个中也没有落后太多。虽然这看起来很有希望,但仍然有很多工作要做,以便使DTable功能丰富且更快,因此请关注该项目的未来更新。

作为本文一部分介绍的功能从今天起普遍可用。我们强烈鼓励每个人查看文档并尝试包含的示例!由于DTable仍处于早期开发阶段,因此完全有可能提供反馈并影响路线图和未来的设计决策。