软件性能越来越依赖于利用多个处理器核心。摩尔定律带来的“免费午餐”已经结束。好吧,我们在 Julia 开发者社区以关注性能而闻名。为了追求性能,我们已经构建了许多用于多进程、分布式和 GPU 的功能,但多年来我们都知道,我们也需要一个关于可组合多线程的良好方案。今天,我们很高兴地宣布这个故事中的一个重要的新篇章。我们发布了一个 Julia 程序全新线程接口的预览版:受 Cilk、Intel Threading Building Blocks 和 Go 等并行编程系统启发的通用任务并行性。任务并行性现已在 v1.3.0-alpha 版本中提供,这是 Julia 1.3.0 版本的早期预览版,预计将在几个月内发布。您可以在下载页面上找到具有此功能的二进制文件,或者从源代码构建主分支。
在这种范式中,程序的任何一部分都可以被标记为并行执行,并且会自动启动一个“任务”在可用的线程上运行该代码。一个动态调度程序会为您处理所有决策和细节。以下是在 Julia 中现在可以编写的并行代码示例
import Base.Threads.@spawn
function fib(n::Int)
if n < 2
return n
end
t = @spawn fib(n - 2)
return fib(n - 1) + fetch(t)
end
当然,这是斐波那契数列的经典高度低效的树递归实现——但在任意数量的处理器核心上运行!t = @spawn fib(n - 2)
这一行启动了一个任务来计算 fib(n - 2)
,它与计算 fib(n - 1)
的下一行并行运行。fetch(t)
等待任务 t
完成并获取其返回值。
这种并行模型具有许多优良特性。我们认为它类似于垃圾回收:使用 GC,您可以自由地分配对象,而无需担心何时以及如何释放它们。使用任务并行性,您可以自由地生成任务——可能有数百万个——而无需担心它们在哪里运行。
该模型可移植且不受底层细节的限制。您无需显式启动和停止线程,甚至无需知道有多少处理器或线程(尽管您可以在需要时找到)。
该模型是可嵌套和可组合的:您可以启动调用库函数的并行任务,这些库函数本身会启动并行任务,并且所有这些都能正常工作。您的 CPU 不会出现线程过度订阅的情况。此属性对于高级语言至关重要,因为库函数会完成大量工作。您需要能够自由地编写任何所需的代码——包括并行代码——而无需担心它调用的库是如何实现的(目前仅限于 Julia 代码,但将来我们计划将其扩展到诸如 BLAS 之类的原生库)。
事实上,这就是我们对本次公告感到兴奋的主要原因:从现在起,在整个 Julia 包生态系统中,添加多核并行性的能力都得到了释放。
这个新功能最令人惊讶的方面之一是它开发了多久。从一开始——甚至在 0.1 版本之前——Julia 就拥有提供对称协程的 Task
类型,我们将其用于基于事件的 I/O。因此,我们一直以来都在语言中拥有并发(独立的执行流)单元,只是它还没有并行(同时的执行流)。不过,我们知道我们需要基于线程的并行性,因此在 2014 年(大约是 0.3 版本的时间段),我们开始了使我们的代码线程安全的漫长过程。Yu Yichao 在垃圾收集器和线程局部存储性能方面做了一些特别令人印象深刻的工作。其中一位作者(Kiran)设计了一些用于调度多个线程和管理原子数据结构的基本基础设施。
大约两年后的 0.5 版本中,我们发布了具有“实验性”状态的 @threads for
宏,它可以处理在所有核心上运行的简单并行循环。即使那不是我们想要的最终设计,它也完成了两个重要的任务:它允许 Julia 程序员开始利用多个核心,并提供了测试用例来消除运行时中与线程相关的错误。然而,最初的 @threads
有一些巨大的限制:@threads
循环不能嵌套:如果它们调用的函数递归地使用 @threads
,则这些内部循环只会占用调用它们的 CPU。它也与我们的 Task
和 I/O 系统不兼容:您不能在带线程的循环内执行任何 I/O 或在 Task
之间切换。
因此,下一个合乎逻辑的步骤是合并 Task
和线程系统,并“简单地”(引发笑声)允许 Task
在线程池中同时运行。我们与 Arch Robison(当时在英特尔)进行了许多早期的讨论,并得出结论,这是我们语言的最佳模型。在 0.5 版本(大约 2016 年)之后,Kiran 开始使用基于深度优先调度思想的新并行任务调度程序partr 进行实验。他用一些漂亮的动画幻灯片说服了我们所有人,而且他愿意做一些工作也没有坏处。该计划是首先将 partr 开发为一个独立的 C 库,以便可以单独对其进行测试和基准测试,然后将其与 Julia 运行时集成。
在 Kiran 完成 partr 的独立版本后,我们中的几个人(这篇文章的作者,以及 Keno Fischer 和英特尔的 Anton Malakhov)开始了一系列面对面的工作会议,以确定如何进行集成。Julia 运行时带来了许多额外的功能,例如垃圾回收和基于事件的 I/O,因此这并非完全简单。有点令人失望的是,对于一个复杂的软件项目来说,这并不罕见,它花费的时间比预期的要长得多——将近两年——才使新系统可靠地工作。本文的后面部分将为好奇的人解释一些内部细节和涉及的困难。但首先,让我们试用一下。
要在使用多个线程的情况下使用 Julia,请设置 JULIA_NUM_THREADS
环境变量
$ JULIA_NUM_THREADS=4 ./julia
Juno IDE 会根据可用处理器核心的数量自动设置线程数,并提供一个用于更改线程数的图形界面,因此在该环境中无需手动设置变量。
Base
的 Threads
子模块包含大多数特定于线程的功能,例如查询线程数和当前线程的 ID
julia> Threads.nthreads()
4
julia> Threads.threadid()
1
现有的 @threads for
用法仍然有效,并且现在完全支持 I/O
julia> Threads.@threads for i = 1:10
println("i = $i on thread $(Threads.threadid())")
end
i = 1 on thread 1
i = 7 on thread 3
i = 2 on thread 1
i = 8 on thread 3
i = 3 on thread 1
i = 9 on thread 4
i = 10 on thread 4
i = 4 on thread 2
i = 5 on thread 2
i = 6 on thread 2
事不宜迟,让我们尝试一些嵌套的并行性。一个经久不衰的示例是归并排序,它将输入分成两半并递归地对每一半进行排序。这两半可以独立排序,从而为并行性提供了自然的机会。以下是该代码
import Base.Threads.@spawn
# sort the elements of `v` in place, from indices `lo` to `hi` inclusive
function psort!(v, lo::Int=1, hi::Int=length(v))
if lo >= hi # 1 or 0 elements; nothing to do
return v
end
if hi - lo < 100000 # below some cutoff, run in serial
sort!(view(v, lo:hi), alg = MergeSort)
return v
end
mid = (lo+hi)>>>1 # find the midpoint
half = @spawn psort!(v, lo, mid) # task to sort the lower half; will run
psort!(v, mid+1, hi) # in parallel with the current call sorting
# the upper half
wait(half) # wait for the lower half to finish
temp = v[lo:mid] # workspace for merging
i, k, j = 1, lo, mid+1 # merge the two sorted sub-arrays
@inbounds while k < j <= hi
if v[j] < temp[i]
v[k] = v[j]
j += 1
else
v[k] = temp[i]
i += 1
end
k += 1
end
@inbounds while k < j
v[k] = temp[i]
k += 1
i += 1
end
return v
end
这只是一个标准的归并排序实现,类似于 Julia 的 Base
库中的实现,只是在其中一个递归调用中添加了 @spawn
结构。Julia 的 Distributed
标准库也已经导出了一个 @spawn
宏相当长一段时间了,但我们计划将其弃用,转而使用新的线程含义(尽管它在 1.x 版本中仍然可用,以确保向后兼容)。这种表达并行性的方式在共享内存中更有用,“spawn”是任务并行 API 中一个相当标准的术语(例如,在 Cilk 和 TBB 中使用)。
wait
只需等待指定的任务完成。该代码通过修改其输入来工作,因此我们不需要任务的返回值。指示不需要返回值是与前面 fib
示例中使用的 fetch
调用唯一的区别。请注意,我们在调用 Julia 的标准 sort!
时显式请求 MergeSort
,以确保我们比较的是相同的东西——sort!
实际上默认使用快速排序对数字进行排序,对于随机数据,快速排序往往更快。让我们在 JULIA_NUM_THREADS=2
下计时该代码
julia> a = rand(20000000);
julia> b = copy(a); @time sort!(b, alg = MergeSort); # single-threaded
2.589243 seconds (11 allocations: 76.294 MiB, 0.17% gc time)
julia> b = copy(a); @time sort!(b, alg = MergeSort);
2.582697 seconds (11 allocations: 76.294 MiB, 2.25% gc time)
julia> b = copy(a); @time psort!(b); # two threads
1.770902 seconds (3.78 k allocations: 686.935 MiB, 4.25% gc time)
julia> b = copy(a); @time psort!(b);
1.741141 seconds (3.78 k allocations: 686.935 MiB, 4.16% gc time)
虽然运行时间有点不稳定,但我们确实看到了使用两个线程带来的加速效果。我们运行此代码的笔记本电脑有四个超线程,如果我们添加第三个线程,性能持续提高尤其令人惊叹
julia> b = copy(a); @time psort!(b);
1.511860 seconds (3.77 k allocations: 686.935 MiB, 6.45% gc time)
考虑一下这个在三个线程上运行的双向分解算法可能会让你头疼!在我们看来,这有助于强调此接口使并行性感觉多么“自动”。
让我们尝试一台单线程性能略低但 CPU 核心更多的机器
$ for n in 1 2 4 8 16; do JULIA_NUM_THREADS=$n ./julia psort.jl; done
2.949212 seconds (3.58 k allocations: 686.932 MiB, 4.70% gc time)
1.861985 seconds (3.77 k allocations: 686.935 MiB, 9.32% gc time)
1.112285 seconds (3.78 k allocations: 686.935 MiB, 4.45% gc time)
0.787816 seconds (3.80 k allocations: 686.935 MiB, 2.08% gc time)
0.655762 seconds (3.79 k allocations: 686.935 MiB, 4.62% gc time)
psort.jl
脚本只是定义了 psort!
函数,调用它一次以避免测量编译开销,然后运行我们上面使用的相同命令。
请注意,尽管并行代码分配的内存大大多于标准例程,但仍然发生了这种加速。分配来自两个来源:Task
对象和每个调用中分配的 temp
数组。参考排序例程在所有递归调用中重用单个临时缓冲区。在并行性下重用临时数组更困难,但仍然可能——稍后会详细介绍。
在 1.3 系列中,新的线程运行时被认为处于测试阶段。一个“正式”版本将在以后的版本中出现,以便我们有时间确定一个我们可以长期承诺的 API。如果您想在此期间升级您的代码,则需要了解以下内容。
为了帮助兼容性,代码默认情况下将继续在单个线程中运行。当使用现有原语(schedule
、@async
)启动任务时,它们将仅在其启动它们的线程内运行。类似地,Condition
对象(用于在事件发生时向任务发出信号)只能由创建它的线程使用。尝试从其他线程等待或通知条件将引发错误。已添加单独的线程安全条件变量,并可作为 Threads.Condition
使用。这需要成为一个单独的类型,因为条件变量的线程安全使用需要获取锁。在 Julia 中,锁与条件捆绑在一起,因此 lock
可以简单地调用条件本身
lock(cond::Threads.Condition)
try
while !ready
wait(cond)
end
finally
unlock(cond)
end
与以前的版本一样,用于保护关键部分的标准锁是 ReentrantLock
,它现在是线程安全的(它以前仅用于同步任务)。还有一些其他类型的锁(Threads.SpinLock
和 Threads.Mutex
),主要用于内部目的。在以下罕见情况下使用它们:(1)只有线程而不是任务将被同步,以及(2)您知道锁只会保持很短时间。
Threads
模块还提供了 Semaphore
和 Event
类型,它们具有其标准定义。
Julia 代码天生倾向于纯函数式(无副作用或变异),或者仅使用局部变异,因此在许多情况下迁移到完全线程安全应该很容易。但是,如果您的代码使用了共享状态,并且您希望使其线程安全,则需要做一些工作。到目前为止,我们在 Julia 的标准库中使用了两种方法来解决这个问题:同步(锁)和线程或任务局部状态。对于不经常访问的共享资源,或者对于无法为每个线程复制的资源,锁效果很好。
但是对于高性能代码,我们推荐使用线程局部状态。我们上面提到的 psort!
例程可以通过这种方式改进。以下是一个方案。首先,我们修改函数签名以接受预分配的缓冲区,使用默认参数值在调用者未提供缓冲区时自动分配空间。
function psort!(v, lo::Int=1, hi::Int=length(v), temps=[similar(v, 0) for i = 1:Threads.nthreads()])
我们只需要为每个线程分配一个最初为空的数组。接下来,我们修改递归调用以重用该空间。
half = @spawn psort!(v, lo, mid, temps)
psort!(v, mid+1, hi, temps)
最后,使用为当前线程保留的数组,而不是分配一个新的数组,并在需要时调整其大小。
temp = temps[Threads.threadid()]
length(temp) < mid-lo+1 && resize!(temp, mid-lo+1)
copyto!(temp, 1, v, lo, mid-lo+1)
在进行这些小的修改后,让我们在我们的大型机器上检查性能。
$ for n in 1 2 4 8 16; do JULIA_NUM_THREADS=$n ./julia psort.jl; done
2.813555 seconds (3.08 k allocations: 153.448 MiB, 1.44% gc time)
1.731088 seconds (3.28 k allocations: 192.195 MiB, 0.37% gc time)
1.028344 seconds (3.30 k allocations: 221.997 MiB, 0.37% gc time)
0.750888 seconds (3.31 k allocations: 267.298 MiB, 0.54% gc time)
0.620054 seconds (3.38 k allocations: 298.295 MiB, 0.77% gc time)
我们对 Julia 的默认全局随机数生成器(rand()
及其相关函数)采用的方法是使其特定于线程。在首次使用时,每个线程将创建一个默认 RNG 类型(当前为 MersenneTwister
)的独立实例,该实例使用系统熵进行播种。所有影响随机数状态的操作(rand
、randn
、seed!
等)随后将仅作用于当前线程的 RNG 状态。这样,多个独立的代码序列(先播种然后使用随机数)将分别按预期工作。
如果您需要所有线程使用已知的初始种子,则需要显式设置它。对于这种更精确的控制或更好的性能,我们建议分配并传递您自己的 RNG 对象(例如 Random.MersenneTwister()
)。
与垃圾回收一样,简单的接口(@spawn
)掩盖了底层巨大的复杂性。在这里,我们将尝试总结我们面临的一些主要困难和设计决策。
每个 Task
都需要它自己的执行堆栈,与 Unix 操作系统提供的常规进程或线程堆栈不同。Windows 有纤维,与任务非常接近,并且对于 Unix 系列系统存在几个类似抽象的库实现。
有很多可能的方法可以实现这一点,并且权衡不同。正如我们经常做的那样,我们试图选择一种可以最大化吞吐量和可靠性的方法。我们有一个由 mmap
(Windows 上的 VirtualAlloc
)分配的共享堆栈池,每个堆栈默认为 4MiB(32 位系统上为 2MiB)。这可能会使用相当多的虚拟内存,因此如果 top
显示您闪亮的新多线程 Julia 代码使用了 100GiB 的地址空间,请不要惊慌。绝大多数空间不会消耗实际资源,并且仅在任务需要执行深度调用链(希望不会持续很长时间)时才存在。这些堆栈比低级语言中的任务系统可能提供的堆栈更大,但我们认为它充分利用了 CPU 和 OS 内核高度完善的内存管理功能,同时大大降低了堆栈溢出的可能性。
默认堆栈大小是构建时选项,在 src/options.h
中设置。Task
构造函数还有一个未记录的第二个参数,允许您为每个任务指定堆栈大小。不建议使用它,因为很难预测需要多少堆栈空间,例如编译器或调用的库。
线程可以通过调整其寄存器以“从”之前切换到该任务的切换“返回”来切换到运行给定任务。在我们开始运行它之前,我们会从本地池中分配一个新的堆栈。一旦任务完成运行,我们就可以立即将它的堆栈释放回池中,避免过多的 GC 压力。
我们还有一个堆栈切换的替代实现(由 options.h
中的 ALWAYS_COPY_STACKS
变量控制),它通过在任务切换发生时复制活动堆栈数据来以时间换取内存。这可能与使用 cfunction
的外部代码不兼容,因此它不是默认设置。
如果堆栈消耗了过多的地址空间,我们将回退到此实现(某些平台——特别是 Linux 和 32 位机器——施加了相当低的限制)。当然,每个实现都有针对多个平台和体系结构的代码,有时使用内联汇编进行进一步优化。堆栈切换是一个丰富的主题,它本身就可以写一篇博文。
我们使用 libuv 进行跨平台基于事件的 I/O。它旨在能够在多线程程序中运行,但本身并不是一个多线程 I/O 库,因此不支持开箱即用的多线程并发使用。目前,我们使用单个全局锁来保护对 libuv 结构的访问,然后允许任何线程(一次一个)运行事件循环。当另一个线程需要事件循环线程唤醒时,它会发出异步信号。这可能由于多种原因发生,包括另一个线程调度新工作、另一个线程开始运行垃圾回收或另一个线程想要获取 IO 锁以执行 IO。
通常,任务可能在一个线程上开始运行,阻塞一段时间,然后在另一个线程上重新启动。这改变了有关线程局部值何时可以更改的基本假设。在内部,Julia 代码不断使用线程局部变量,例如每次分配内存时。我们尚未开始进行支持迁移所需的所有更改,因此目前任务必须始终在其开始运行的线程上运行(尽管当然,它可以在任何线程上开始运行)。为了支持这一点,我们有一个“粘性”任务的概念,该任务必须在给定线程上运行,以及与每个线程关联的用于运行任务的每个线程队列。
当没有足够的任务来使所有线程保持忙碌时,一些线程需要休眠以避免始终使用 100% 的所有 CPU。这是一个棘手的同步问题,因为某些线程可能会在其他线程决定休眠时调度新工作。
当任务阻塞时,我们需要调用调度程序来选择另一个要运行的任务。该代码使用哪个堆栈运行?可以有一个专用的调度程序任务,但我们认为如果我们允许调度程序代码在最近阻塞的任务的上下文中运行,则开销会更小。这工作得很好,但这意味着任务可以存在于一个奇怪的中间状态,其中它被认为没有运行,但实际上正在运行调度程序。其含义之一是我们可能会从调度程序队列中提取一个任务,只是意识到我们根本不需要切换。
在尝试使这项新功能正常工作时,我们遇到了一些令人抓狂的难以解决的错误。最明显的错误是在 Windows 上的神秘挂起,通过字面意思翻转单个位来修复。
另一个好的例子是缺少异常处理个性。事后看来,这本来可能是简单的,但由于两个因素而变得复杂:首先,故障模式导致内核以我们无法在调试器中拦截的方式停止我们的进程,其次,故障是由一个看似无关的更改触发的。所有 Julia 堆栈帧都设置了异常处理个性,因此问题只能出现在运行时系统中,在任何 Julia 帧之外——这是一个狭窄的窗口,因为当然我们通常在执行 Julia 代码。
虽然我们对这一里程碑感到兴奋,但仍有很多工作要做。此 alpha 版本引入了 @spawn
构造,但并非旨在最终确定其设计。以下是一些我们希望重点关注以进一步发展我们的线程功能的要点。
任务切换和 I/O 延迟的性能工作。
向标准库添加并行性。许多常见的操作,如排序和数组广播,现在可以在内部使用多个线程。
考虑允许任务迁移。
在 Julia 层面提供更多原子操作。
在编译器中使用多线程。
更具性能的并行循环和归约,以及更多调度选项。
允许在运行时添加更多线程。
改进的调试工具。
探索 API 扩展,例如取消点。
线程安全数据结构的标准库。
提供替代调度程序。
我们要感谢英特尔和relationalAI提供的资金支持,这使得开发这些新功能成为可能。
我们还要感谢在开发过程中耐心尝试此功能并提交错误报告或拉取请求的各位,并激励我们继续前进。如果您在 Julia 中使用线程时遇到任何问题,请在GitHub或我们的Discourse论坛上告知我们!