关于 threadid() 和 nthreads() 的常见误解
部分原因是我们的并行和并发接口的演变历史[1],一些 Julia 程序员编写了错误的并行代码,这些代码可能存在竞争条件,从而导致结果错误。这种模式甚至在以前的官方 Julia 博客文章中被错误地推荐过。为了生态系统的稳定性和正确性,必须识别并修复这些用法。
此错误代码遵循的通用模板如下所示
using Base.Threads: nthreads, @threads, threadid
states = [some_initial_value for _ in 1:nthreads()]
@threads for x in some_data
tid = threadid()
old_val = states[tid]
new_val = some_operator(old_val, f(x))
states[tid] = new_val
end
do_something(states)
以上代码是错误的,因为 @threads
生成的任务允许在执行过程中让出给其他任务[2]。这意味着在读取 old_val
和将 new_val
存储到存储区之间,任务可能会暂停,并且在同一线程上运行且具有相同 threadid()
的新任务可能会并发地写入 states[tid]
,从而导致竞争条件,并导致工作丢失。
这实际上并不是多线程特有的问题,而是一个并发问题,即使只有一个线程也能证明这一点。例如
$ julia --threads=1
julia> f(i) = (sleep(0.001); i);
julia> let state = [0], N=100
@sync for i ∈ 1:N
Threads.@spawn begin
tid = Threads.threadid() # Each task gets `tid = 1`.
old_var = state[tid] # Each task reads the current value, which for
# all is 0 (!) because...
new_var = old_var + f(i) # ...the `sleep` in `f` causes all tasks to pause
# *simultaneously* here (all loop iterations start,
# but do not yet finish).
state[tid] = new_var # After being released from the `sleep`, each task
# sets `state[1]` to `i`.
end
end
sum(state), sum(1:N)
end
(100, 5050)
在上面的代码段中,我们故意使用 100
个单独的任务过度订阅了 CPU,以使错误更有可能出现,但即使没有生成很多任务,也可能出现此问题。
包或用户代码中任何使用 threadid()
的情况都应视为警告信号,表明代码依赖于实现细节,并且容易出现并发错误。
@threads
替换为 @threads :static
如果您碰巧使用 @threads
,最简单(但从长远来看不推荐)的快速修复方法是将 @threads for ...
的用法替换为 @threads :static for ...
。这样做的原因是 @threads
的 :static
调度程序不允许导致错误的异步任务迁移和让出。
但是,这不是一个好的长期解决方案,因为
它依赖于不明显的隐式保护措施来防止原本错误的代码变得正确
@threads :static
不是协作的或可组合的,这意味着如果您的 @threads :static
循环内的代码也执行多线程,您的代码可能会降低到比单线程速度更慢,甚至死锁。
如果您想要一个食谱,可以使用它将上述错误的食谱替换为仅使用 Base.Threads
模块编写的食谱,我们建议您远离 @threads
,而是直接使用 @spawn
来创建和管理任务。原因是 @threads
没有任何内置机制来管理和合并来自不同线程的工作结果,而任务可以以安全的方式管理和返回自己的状态。
任务创建和返回自己的状态本质上比并行任务的生成器为生成的子任务设置要读取和写入的状态更安全。
替换上面显示的错误代码模式的代码可能如下所示
using Base.Threads: nthreads, @threads, @spawn
using Base.Iterators: partition
tasks_per_thread = 2 # customize this as needed. More tasks have more overhead, but better
# load balancing
chunk_size = max(1, length(some_data) ÷ (tasks_per_thread * nthreads()))
data_chunks = partition(some_data, chunk_size) # partition your data into chunks that
# individual tasks will deal with
#See also ChunkSplitters.jl and SplittablesBase.jl for partitioning data
tasks = map(data_chunks) do chunk
# Each chunk of your data gets its own spawned task that does its own local, sequential work
# and then returns the result
@spawn begin
state = some_initial_value
for x in chunk
state = some_operator(state, f(x))
end
return state
end
end
states = fetch.(tasks) # get all the values returned by the individual tasks. fetch is type
# unstable, so you may optionally want to assert a specific return type.
do_something(states)
这是对旧有错误模式的完全通用替换。但是,对于许多应用程序,这应该简化为 mapreduce
的并行版本
using Base.Threads: nthreads, @spawn
function tmapreduce(f, op, itr; tasks_per_thread::Int = 2, kwargs...)
chunk_size = max(1, length(itr) ÷ (tasks_per_thread * nthreads()))
tasks = map(Iterators.partition(itr, chunk_size)) do chunk
@spawn mapreduce(f, op, chunk; kwargs...)
end
mapreduce(fetch, op, tasks; kwargs...)
end
在 tmapreduce(f, op, itr)
中,函数 f
应用于 itr
的每个元素,然后是关联[3]的双参数函数 op
。
希望以上 tmapreduce
可以在不久的将来添加到基础 Julia 中。然而,在此期间,编写自己的代码相对简单,如上所示。
我们鼓励大家查看各种多线程库,例如 Transducers.jl(或各种前端,如 ThreadsX.jl、FLoops.jl 和 Folds.jl)以及 MultiThreadedCaches.jl。
Transducers.jl 从根本上讲是关于以结构化和原则性的方式表达数据遍历,通常的好处是它使并行计算更容易推理。
以上 tmapreduce(f, op, itr)
可以表示为
using Transducers
itr |> Map(f) |> foldxt(op; init=some_initial_value)
或
using Transducers
foldxt(op, Map(f), itr; init=some_initial_value)
列出的各种前端为 Transducers.jl 提供了不同的 API,有些人可能会发现这些 API 更易于使用。例如
using ThreadsX
ThreasdX.mapreduce(f, op, itr; init=some_initial_value)
或
using FLoops
@floop for x in itr
@reduce out = op(some_initial_value, f(x))
end
out
另一方面,MultiThreadedCaches.jl 试图通过使用锁机制来阻止数据竞争,从而使 states[threadid()]
类的模式更安全。我们认为这不是一个理想的解决方法,但它可能使过渡到更安全的代码变得更容易,并且需要更少的重写,例如在包必须管理可能由用户并发写入的状态并且包无法控制用户如何构建其代码的情况下。
[1] | 并发与并行 |
在 Julia 中,任务是表达并发的基本机制。并发是指同时执行多个程序或任务的能力。
任务将映射到任意数量的“工作线程”,这将导致它们并行执行。这通常称为 M:N
线程或绿色线程。即使 Julia 只用一个工作线程启动,程序员也可以表达并发操作。
在 Julia 的早期版本中,@async
宏用于调度并发任务,这些任务在单个线程上异步执行。后来,开发了 @threads
宏用于 CPU 并行,并允许用户轻松地并行执行循环的块,但在当时,这与语言中的任务概念是分离的。这导致了各种可组合性问题,并促使语言发生变化。
Julia 中的并发模型一直在随着次要版本而发展。Julia v1.3 引入了任务的并行调度程序和 Threads.@spawn
;v1.5 引入了 @threads :static
,目的是在将来的版本中更改默认调度。Julia v1.7 启用了任务迁移,Julia v1.8 将 @threads
的默认值更改为动态调度程序。
当引入并行调度程序时,我们决定在野外使用 @async
并期望特定语义的代码太多了,因此 Threads.@spawn
可用于访问新的语义。@async
有各种问题,我们不鼓励在新的代码中使用它。
使用 threadid
/nthreads
/maxthreadid
随着时间的推移,添加了一些功能,使得依赖 threadid
、nthreads
甚至 maxthreadid
变得危险
任务迁移:任务可以在其执行期间观察多个 threadid
。
交互式优先级:nthreads()
将报告非交互式工作线程的数量,因此低估了活动线程的数量。
线程采用(v1.9):外部线程现在可以在程序执行的任何时间被采用(并在稍后被删除)。
GC 线程:运行时可以使用其他线程来加速执行垃圾回收等工作。
任何依赖于特定 threadid
保持不变或在执行期间线程数量保持不变的代码都注定是错误的。根据经验,程序员最多应该查询线程数量以激发启发式方法,例如如何分配并行工作,但程序通常不应编写为依赖于线程的实现细节以确保正确性。相反,程序员应该推理任务,即可能与其他代码并发执行的工作片段,而与用于执行它们的线程数量无关。
[2] | 不要试图推理让出 |
许多现有的线程局部状态用法碰巧相对健壮,并且仅给出正确答案,因为它们在执行期间调用的函数不会让出。然后人们可能会认为“好吧,我可以通过确保我的代码不会让出就可以避免这个问题”,但我们认为这是一个糟糕且不可持续的想法,因为函数调用是否会让出是不稳定的、不明显的或难以检查的。
例如,如果函数 f
更新为包含后台 @debug
语句或其他形式的非用户可见的 IO,它可能会从非让出变为让出。如果在调用 f
期间,编译器遇到动态分派,其中必须 JIT 编译新代码,则可能会遇到让出点,并且对代码可能会发生任意数量的其他内部更改,从而导致它让出。
此外,Julia 的未来版本最终可能会从协作式任务模型转向抢占式任务模型,在这种情况下,让出点将不再是遇到这种竞争条件的唯一方式。
[3] | 关联性 |
关联性 是并行归约函数的一个重要属性,因为它意味着 op(a, op(b, c)) == op(op(a, b), c)
,因此结果不依赖于执行归约的顺序。
请注意,关联性与交换性不同,交换性是 op(a, b) == op(b, a)
的属性。这不是并行归约函数所必需的。