分布式数值优化

2013年4月5日 | Miles Lubin

这篇文章将逐步介绍 Julia 的并行计算功能,以实现经典 *切割平面* 算法的异步并行版本,用于凸(非光滑)优化,并演示完整的流程,包括在 Amazon EC2 和大型多核服务器上运行。我将快速回顾切割平面算法,并将主要关注并行计算模式,所以即使你不熟悉优化方面的内容也不要担心。

切割平面算法

切割平面算法是一种解决优化问题的算法

minxRdi=1nfi(x)\min_{x \in \mathbb R^d} \sum_{i=1}^n f_i(x)

其中函数 fif_i 是凸函数,但不一定是可微的。绝对值函数 x|x| 和 1-范数 x1 ||x|| _ 1 是典型的例子。重要的应用也来自 拉格朗日松弛。该算法的思想是用分段线性模型 mi m_i 来逼近函数 fif_i ,这些模型是根据在不同点评估 fi f_i 获得的信息构建的。我们迭代地对模型进行最小化以生成候选解点。

我们可以将算法描述为

  1. 选择起始点 x x

  2. 对于 i=1,,ni = 1,\ldots,n,评估 fi(x)f_i(x) 并更新相应的模型 mi m_i .

  3. 令下一个候选 x x i=1nmi(x) \sum_{i=1}^n m_i(x) 的最小化值。

  4. 如果未收敛,则转到步骤 2。

如果评估fi(x) f_i(x) 成本很高,则该算法在步骤 2 处自然可并行化。步骤 3 中的最小化可以通过求解一个线性优化问题来计算,这通常非常快。(在此我指出,Julia 通过 JuliaOpt 提供了线性规划和其他优化求解器的接口。)

抽象出数学公式,我们可以使用以下 Julia 代码编写该算法。

# functions initialize, isconverged, solvesubproblem, and process implemented elsewhere
state, subproblems = initialize()
while !isconverged(state)
    results = map(solvesubproblem,subproblems)
    state, subproblems = process(state, results)
end

函数 solvesubproblem 对应于评估给定 i i x x fi(x)f_i(x)subproblems 的元素可以是元组 (i,x))。函数 process 对应于最小化步骤 3 中的模型,并产生一个新的状态和一组新的子问题需要解决。

请注意,该算法看起来非常像一个易于使用许多现有框架并行化的 map-reduce。实际上,在 Julia 中,我们只需将 map 替换为 pmap(并行 map)即可。让我们考虑一个使并行化不那么简单的变化。

异步变体

solvesubproblem 函数执行时间中的可变性可能导致负载不平衡并限制并行效率,因为工作线程会空闲等待新任务。如果 solvesubproblem 本身需要求解一个优化问题,或者如果工作线程和网络是共享的(这在云计算中很常见),则这种可变性自然会产生。

我们可以考虑切割平面算法的一个新变体来解决此问题。关键在于

换句话说,我们生成新的任务来馈送给工作线程,而无需等待所有当前任务完成,从而使算法异步。该算法仍然收敛,尽管迭代总数可能会增加。有关更多详细信息,请参阅 Jeff Linderoth 和 Stephen Wright 的 这篇论文

通过引入异步性,我们不能再使用一个漂亮的黑盒 pmap 函数,而必须更深入地研究并行实现。幸运的是,在 Julia 中这样做很容易。

Julia 中的并行实现

Julia 基于单边消息传递实现分布式内存并行,其中进程将工作推送到其他进程(通过 remotecall),结果由需要它们的进程检索(通过 fetch)。诸如 @spawn@parallel 之类的宏为这种低级功能提供了漂亮的语法。这种并行模型与 MPI 的典型 SIMD 样式非常不同。这两种方法在不同的上下文中都有用,我希望 Julia 的 MPI 包装器将来会出现(另请参见 此处)。

强烈建议阅读有关并行计算的 手册,我不会试图在本帖中重现它。相反,我们将深入研究并扩展它提供的示例之一。

Julia 中 pmap 的实现是

function pmap(f, lst)
    np = nprocs()  # determine the number of processors available
    n = length(lst)
    results = cell(n)
    i = 1
    # function to produce the next work item from the queue.
    # in this case it's just an index.
    next_idx() = (idx=i; i+=1; idx)
    @sync begin
        for p=1:np
            if p != myid() || np == 1
                @spawnlocal begin
                    while true
                        idx = next_idx()
                        if idx > n
                            break
                        end
                        results[idx] = remotecall_fetch(p, f, lst[idx])
                    end
                end
            end
        end
    end
    results
end

乍一看,这段代码并不特别直观。@spawnlocal 宏在主进程(例如进程 1)上创建一个 任务。每个任务都将工作馈送到相应的工作线程;调用 remotecall_fetch(p, f, lst[idx]) 函数在进程 p 上调用 f,并在完成后返回结果。任务是不可中断的,并且仅在特定点(例如 remotecall_fetch)放弃控制。任务不能直接修改封闭作用域中的变量,但可以通过使用 next_idx 函数访问和修改 i 来实现相同的效果。任务惯用法取代了使用循环轮询每个工作线程结果的功能。

实现我们的异步算法只不过是对上述代码进行修改而已

# given constants n and 0 < alpha <= 1
# functions initialize and solvesubproblem defined elsewhere
np = nprocs()
state, subproblems = initialize()
converged = false
isconverged() = converged
function updatemodel(mysubproblem, result)
    # store result
    ...
    # decide whether to generate new subproblems
    state.numback[mysubproblem.parent] += 1
    if state.numback[mysubproblem.parent] >= alpha*n && !state.didtrigger[mysubproblem.parent]
        state.didtrigger[mysubproblem.parent] = true
        # generate newsubproblems by solving linear optimization problem
        ...
        if ... # convergence test
            converged = true
        else
            append!(subproblems, newsubproblems)
            push!(state.didtrigger, false)
            push!(state.numback, 0)
            # ensure that for s in newsubproblems, s.parent == length(state.numback)
        end
    end
end

@sync begin
    for p=1:np
        if p != myid() || np == 1
            @spawnlocal begin
                while !isconverged()
                    if length(subproblems) == 0
                        # no more subproblems but haven't converged yet
                        yield()
                        continue
                    end
                    mysubproblem = shift!(subproblems) # pop subproblem from queue
                    result = remotecall_fetch(p, solvesubproblem, mysubproblem)
                    updatemodel(mysubproblem, result)
                end
            end
        end
    end
end

其中 state 是一个定义为以下类型的实例

type State
    didtrigger::Vector{Bool}
    numback::Vector{Int}
    ...
end

@sync 块内的代码结构几乎没有区别,异步逻辑封装在局部 updatemodel 函数中,该函数有条件地生成新的子问题。Julia 的优势在于像 pmap 这样的函数是在 Julia 本身中实现的,因此进行此类修改特别简单。

运行它

现在是有趣的部分。完整的切割平面算法(以及其他变体)在 JuliaBenders 中实现。该代码专门用于 随机规划,其中切割平面算法称为 L 形方法 或 Benders 分解,用于分解大型线性优化问题的解。在这里,solvesubproblem 需要解决一个相对较小的线性优化问题。测试实例取自 前面提到的论文

我们首先在一个大型多核服务器上运行。runals.jl(异步 L 形)文件包含我们将使用的算法。其用法为

julia runals.jl [data source] [num subproblems] [async param] [block size]

其中 [num subproblems] 是如上所示的 nn[async param] 是比例 α\alpha。通过设置 α=1\alpha = 1,我们得到同步算法。对于异步版本,我们将取 α=0.6\alpha = 0.6[block size] 参数控制一次发送给工作线程的子问题数量(在前面的代码中,此值始终为 1)。在我们的实验中,我们将使用 4000 个子问题。

要在共享内存机器上运行多个 Julia 进程,我们将 -p N 选项传递给 julia 可执行文件,这将启动 N 个系统进程。要使用 10 个工作线程执行异步版本,我们运行

julia -p 12 runals.jl Data/storm 4000 0.6 30

请注意,我们启动了 12 个进程。这些是 10 个工作线程、一个主线程(分发任务)以及另一个执行主线程计算的进程(一个额外的改进,上面没有描述)。下表显示了各种运行的结果。

同步 异步
工作线程数 速度 效率速度 效率
10 154 基准 166 基准
20 309 100.3% 348 105%
40 517 84% 654 98%
60 674 73% 918 92%

表:在共享内存 8x Xeon E7-8850 服务器上的结果。工作线程对应于单个内核。速度是每秒解决的子问题数。效率计算为获得的理想并行加速的百分比。使用 20 个工作线程观察到的超线性扩展可能是系统伪像。

为了在 EC2 上运行,还需要跨越几个障碍。首先,我们必须构建一个安装了 Julia 的系统镜像 (AMI)。Julia 通过 ssh 连接到工作线程,因此我发现将我的 EC2 ssh 密钥放在 AMI 上并还在 /etc/ssh/ssh_config 中设置 StrictHostKeyChecking no 以在连接到新工作线程时禁用真实性提示很有用。有人可能会纠正我,说明这是否是正确的方法。

假设我们已有一个 AMI,我们可以启动实例。我为主线程使用了 m3.xlarge 实例,为工作线程使用了 m1.medium 实例。(注意:您可以通过使用竞价型实例节省大量资金。)

要在启动时添加远程工作线程,Julia 通过 --machinefile 选项接受包含主机名列表的文件。我们可以使用 EC2 API 工具(Ubuntu 包 ec2-api-tools)和以下命令轻松生成它

ec2-describe-instances | grep running | awk '{ print $5; }' > mfile

然后,我们可以在主线程实例上运行

julia --machinefile mfile runatr.jl Data/storm 4000 0.6 30

下表显示了各种运行的结果。

同步 异步
工作线程数 速度 效率速度 效率
10 149 基准 151 基准
20 289 97% 301 99.7%
40 532 89% 602 99.5%

表:在 Amazon EC2 上的结果。工作线程对应于单个 m1.medium 实例。主线程在 m3.xlarge 实例上运行。

在两种架构上,异步版本都能够以更高的速度解决子问题,并且具有显著更好的并行效率。在 EC2 上的扩展性优于共享内存服务器,这可能是因为子问题计算受内存限制,因此分布式内存架构上的性能更好。无论如何,使用 Julia,我们可以在两种架构上轻松进行实验。

进一步阅读

为 2013 年 1 月在麻省理工学院举行的 Julia IAP 会议准备了一个更详细的教程

Creative Commons License
分布式数值优化Miles Lubin 创作,并根据 知识共享署名 4.0 国际许可协议 许可。