这篇文章将逐步介绍 Julia 的并行计算功能,以实现经典 *切割平面* 算法的异步并行版本,用于凸(非光滑)优化,并演示完整的流程,包括在 Amazon EC2 和大型多核服务器上运行。我将快速回顾切割平面算法,并将主要关注并行计算模式,所以即使你不熟悉优化方面的内容也不要担心。
切割平面算法是一种解决优化问题的算法
其中函数 是凸函数,但不一定是可微的。绝对值函数 和 1-范数 是典型的例子。重要的应用也来自 拉格朗日松弛。该算法的思想是用分段线性模型 来逼近函数 ,这些模型是根据在不同点评估 获得的信息构建的。我们迭代地对模型进行最小化以生成候选解点。
我们可以将算法描述为
选择起始点 。
对于 ,评估 并更新相应的模型 .
令下一个候选 为 的最小化值。
如果未收敛,则转到步骤 2。
如果评估成本很高,则该算法在步骤 2 处自然可并行化。步骤 3 中的最小化可以通过求解一个线性优化问题来计算,这通常非常快。(在此我指出,Julia 通过 JuliaOpt 提供了线性规划和其他优化求解器的接口。)
抽象出数学公式,我们可以使用以下 Julia 代码编写该算法。
# functions initialize, isconverged, solvesubproblem, and process implemented elsewhere
state, subproblems = initialize()
while !isconverged(state)
results = map(solvesubproblem,subproblems)
state, subproblems = process(state, results)
end
函数 solvesubproblem
对应于评估给定 和 的 (subproblems
的元素可以是元组 (i,x)
)。函数 process
对应于最小化步骤 3 中的模型,并产生一个新的状态和一组新的子问题需要解决。
请注意,该算法看起来非常像一个易于使用许多现有框架并行化的 map-reduce。实际上,在 Julia 中,我们只需将 map
替换为 pmap
(并行 map)即可。让我们考虑一个使并行化不那么简单的变化。
solvesubproblem
函数执行时间中的可变性可能导致负载不平衡并限制并行效率,因为工作线程会空闲等待新任务。如果 solvesubproblem
本身需要求解一个优化问题,或者如果工作线程和网络是共享的(这在云计算中很常见),则这种可变性自然会产生。
我们可以考虑切割平面算法的一个新变体来解决此问题。关键在于
当给定候选解的子问题的比例 已解决时,通过使用当前可用的任何信息生成一个新的候选解和相应的子问题集。
换句话说,我们生成新的任务来馈送给工作线程,而无需等待所有当前任务完成,从而使算法异步。该算法仍然收敛,尽管迭代总数可能会增加。有关更多详细信息,请参阅 Jeff Linderoth 和 Stephen Wright 的 这篇论文。
通过引入异步性,我们不能再使用一个漂亮的黑盒 pmap
函数,而必须更深入地研究并行实现。幸运的是,在 Julia 中这样做很容易。
Julia 基于单边消息传递实现分布式内存并行,其中进程将工作推送到其他进程(通过 remotecall
),结果由需要它们的进程检索(通过 fetch
)。诸如 @spawn
和 @parallel
之类的宏为这种低级功能提供了漂亮的语法。这种并行模型与 MPI 的典型 SIMD 样式非常不同。这两种方法在不同的上下文中都有用,我希望 Julia 的 MPI 包装器将来会出现(另请参见 此处)。
强烈建议阅读有关并行计算的 手册,我不会试图在本帖中重现它。相反,我们将深入研究并扩展它提供的示例之一。
Julia 中 pmap
的实现是
function pmap(f, lst)
np = nprocs() # determine the number of processors available
n = length(lst)
results = cell(n)
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
next_idx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
if p != myid() || np == 1
@spawnlocal begin
while true
idx = next_idx()
if idx > n
break
end
results[idx] = remotecall_fetch(p, f, lst[idx])
end
end
end
end
end
results
end
乍一看,这段代码并不特别直观。@spawnlocal
宏在主进程(例如进程 1)上创建一个 任务。每个任务都将工作馈送到相应的工作线程;调用 remotecall_fetch(p, f, lst[idx])
函数在进程 p
上调用 f
,并在完成后返回结果。任务是不可中断的,并且仅在特定点(例如 remotecall_fetch
)放弃控制。任务不能直接修改封闭作用域中的变量,但可以通过使用 next_idx
函数访问和修改 i
来实现相同的效果。任务惯用法取代了使用循环轮询每个工作线程结果的功能。
实现我们的异步算法只不过是对上述代码进行修改而已
# given constants n and 0 < alpha <= 1
# functions initialize and solvesubproblem defined elsewhere
np = nprocs()
state, subproblems = initialize()
converged = false
isconverged() = converged
function updatemodel(mysubproblem, result)
# store result
...
# decide whether to generate new subproblems
state.numback[mysubproblem.parent] += 1
if state.numback[mysubproblem.parent] >= alpha*n && !state.didtrigger[mysubproblem.parent]
state.didtrigger[mysubproblem.parent] = true
# generate newsubproblems by solving linear optimization problem
...
if ... # convergence test
converged = true
else
append!(subproblems, newsubproblems)
push!(state.didtrigger, false)
push!(state.numback, 0)
# ensure that for s in newsubproblems, s.parent == length(state.numback)
end
end
end
@sync begin
for p=1:np
if p != myid() || np == 1
@spawnlocal begin
while !isconverged()
if length(subproblems) == 0
# no more subproblems but haven't converged yet
yield()
continue
end
mysubproblem = shift!(subproblems) # pop subproblem from queue
result = remotecall_fetch(p, solvesubproblem, mysubproblem)
updatemodel(mysubproblem, result)
end
end
end
end
end
其中 state
是一个定义为以下类型的实例
type State
didtrigger::Vector{Bool}
numback::Vector{Int}
...
end
@sync
块内的代码结构几乎没有区别,异步逻辑封装在局部 updatemodel
函数中,该函数有条件地生成新的子问题。Julia 的优势在于像 pmap
这样的函数是在 Julia 本身中实现的,因此进行此类修改特别简单。
现在是有趣的部分。完整的切割平面算法(以及其他变体)在 JuliaBenders 中实现。该代码专门用于 随机规划,其中切割平面算法称为 L 形方法 或 Benders 分解,用于分解大型线性优化问题的解。在这里,solvesubproblem
需要解决一个相对较小的线性优化问题。测试实例取自 前面提到的论文。
我们首先在一个大型多核服务器上运行。runals.jl
(异步 L 形)文件包含我们将使用的算法。其用法为
julia runals.jl [data source] [num subproblems] [async param] [block size]
其中 [num subproblems]
是如上所示的 ,[async param]
是比例 。通过设置 ,我们得到同步算法。对于异步版本,我们将取 。[block size]
参数控制一次发送给工作线程的子问题数量(在前面的代码中,此值始终为 1)。在我们的实验中,我们将使用 4000 个子问题。
要在共享内存机器上运行多个 Julia 进程,我们将 -p N
选项传递给 julia
可执行文件,这将启动 N
个系统进程。要使用 10 个工作线程执行异步版本,我们运行
julia -p 12 runals.jl Data/storm 4000 0.6 30
请注意,我们启动了 12 个进程。这些是 10 个工作线程、一个主线程(分发任务)以及另一个执行主线程计算的进程(一个额外的改进,上面没有描述)。下表显示了各种运行的结果。
同步 | 异步 | |||
工作线程数 | 速度 | 效率 | 速度 | 效率 |
10 | 154 | 基准 | 166 | 基准 |
20 | 309 | 100.3% | 348 | 105% |
40 | 517 | 84% | 654 | 98% |
60 | 674 | 73% | 918 | 92% |
表:在共享内存 8x Xeon E7-8850 服务器上的结果。工作线程对应于单个内核。速度是每秒解决的子问题数。效率计算为获得的理想并行加速的百分比。使用 20 个工作线程观察到的超线性扩展可能是系统伪像。
为了在 EC2 上运行,还需要跨越几个障碍。首先,我们必须构建一个安装了 Julia 的系统镜像 (AMI)。Julia 通过 ssh 连接到工作线程,因此我发现将我的 EC2 ssh 密钥放在 AMI 上并还在 /etc/ssh/ssh_config
中设置 StrictHostKeyChecking no
以在连接到新工作线程时禁用真实性提示很有用。有人可能会纠正我,说明这是否是正确的方法。
假设我们已有一个 AMI,我们可以启动实例。我为主线程使用了 m3.xlarge 实例,为工作线程使用了 m1.medium 实例。(注意:您可以通过使用竞价型实例节省大量资金。)
要在启动时添加远程工作线程,Julia 通过 --machinefile
选项接受包含主机名列表的文件。我们可以使用 EC2 API 工具(Ubuntu 包 ec2-api-tools
)和以下命令轻松生成它
ec2-describe-instances | grep running | awk '{ print $5; }' > mfile
然后,我们可以在主线程实例上运行
julia --machinefile mfile runatr.jl Data/storm 4000 0.6 30
下表显示了各种运行的结果。
同步 | 异步 | |||
工作线程数 | 速度 | 效率 | 速度 | 效率 |
10 | 149 | 基准 | 151 | 基准 |
20 | 289 | 97% | 301 | 99.7% |
40 | 532 | 89% | 602 | 99.5% |
表:在 Amazon EC2 上的结果。工作线程对应于单个 m1.medium 实例。主线程在 m3.xlarge 实例上运行。
在两种架构上,异步版本都能够以更高的速度解决子问题,并且具有显著更好的并行效率。在 EC2 上的扩展性优于共享内存服务器,这可能是因为子问题计算受内存限制,因此分布式内存架构上的性能更好。无论如何,使用 Julia,我们可以在两种架构上轻松进行实验。
为 2013 年 1 月在麻省理工学院举行的 Julia IAP 会议准备了一个更详细的教程。