原文: http://numba.pydata.org/numba-doc/latest/user/parallel.html
为 jit()
设置并行选项可启用 Numba 转换过程,该过程尝试自动并行化并对函数(部分)执行其他优化。目前,此功能仅适用于 CPU。
用户定义的函数内的一些操作,例如,向数组添加标量值已知具有并行语义。用户程序可以包含许多这样的操作,并且虽然每个操作可以单独并行化,但是这种方法通常由于较差的高速缓存行为而具有低廉的性能。相反,通过自动并行化,Numba 尝试在用户程序中识别此类操作,并将相邻的操作融合在一起,以形成一个或多个并行自动运行的内核。该过程完全自动化而无需修改用户程序,这与 Numba 的 vectorize()
或 guvectorize()
机制形成对比,其中需要手动操作来创建并行内核。
在本节中,我们列出了所有具有并行语义且我们尝试并行化的数组操作。
-
案例研究支持的所有 numba 数组操作:数组表达式,包括 Numpy 数组之间,数组和标量之间的常用算术函数,以及 Numpy ufuncs。它们通常被称为<cite>元素</cite>或<cite>逐点</cite>数组操作:
> * 一元算子:
+
-
~
> * 二元算子:+
-
*
/
/?
%
|
>>
^
<<
&
**
//
> * 比较运算符:==
!=
<
<=
>
>=
> * nopython 模式支持的 Numpy ufuncs。 > * 用户定义DUFunc
至vectorize()
。 -
Numpy 减少函数
sum
,prod
,min
,max
,argmin
和argmax
。此外,数组数学函数mean
,var
和std
。 -
Numpy 数组创建函数
zeros
,ones
,arange
,linspace
和几个随机函数(rand,randn,ranf,random_sample,sample,random,standard_normal,chisquare,weibull,power,geometric,exponential,poisson ,瑞利,正常,均匀,贝塔,二项式,f,伽玛,对数正态,拉普拉斯,兰丁,三角形)。 -
Numpy
dot
函数在矩阵和向量之间,或两个向量之间。在所有其他情况下,使用 Numba 的默认实现。 -
当操作数具有匹配的尺寸和大小时,上述操作也支持多维数组。不支持具有混合维度或大小的阵列之间的 Numpy 广播的完整语义,也不支持所选维度上的减少。
-
数组赋值,其中目标是使用切片或布尔数组的数组选择,并且指定的值是标量或其他选择,其中切片范围或位阵列被推断为兼容。
-
functools
的reduce
运算符支持指定 1D Numpy 数组的并行减少,但初始值参数是必需的。
代码转换传递的另一个特性(当parallel=True
时)支持显式并行循环。可以使用 Numba 的prange
而不是range
来指定循环可以并行化。除了支持的减少之外,用户需要确保循环没有交叉迭代依赖性。
如果变量由二元函数/运算符使用其在循环体中的先前值更新,则自动推断减少。对于+=
和*=
运算符,自动推断减少的初始值。对于其他函数/运算符,reduce 变量应在进入prange
循环之前保持标识值。对于标量和任意维度的数组,支持以这种方式减少。
下面的示例演示了一个带有缩减的并行循环(A
是一维 Numpy 数组):
from numba import njit, prange
@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s
以下示例演示了二维数组的产品缩减:
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def two_d_array_reduction_prod(n):
shp = (13, 17)
result1 = 2 * np.ones(shp, np.int_)
tmp = 2 * np.ones_like(result1)
for i in prange(n):
result1 *= tmp
return result1
在本节中,我们举例说明此功能如何帮助并行化 Logistic 回归:
@numba.jit(nopython=True, parallel=True)
def logistic_regression(Y, X, w, iterations):
for i in range(iterations):
w -= np.dot(((1.0 / (1.0 + np.exp(-Y * np.dot(X, w))) - 1.0) * Y), X)
return w
我们不会讨论算法的细节,而是关注该程序如何使用自动并行化:
- 输入
Y
是大小为N
的向量,X
是N x D
矩阵,w
是大小为D
的向量。 - 函数体是一个迭代循环,它更新变量
w
。循环体由一系列向量和矩阵运算组成。 - 内部
dot
操作产生一个大小为N
的向量,然后是标量和大小为N
的向量之间的一系列算术运算,或两个大小为N
的向量。 - 外部
dot
产生一个大小为D
的向量,然后在变量w
上进行就地数组减法。 - 通过自动并行化,将生成大小为
N
的数组的所有操作融合在一起,成为单个并行内核。这包括内部dot
操作和后面的所有逐点数组操作。 - 外部
dot
操作产生不同维度的结果数组,并且不与上述内核融合。
这里,利用并行硬件唯一需要的是为 jit()
设置并行选项,而不对logistic_regression
功能本身进行修改。如果我们使用 guvectorize()
给出等价并行实现,则需要进行普遍的更改,重写代码以提取可并行化的内核计算,这既繁琐又具有挑战性。
注意
目前,并非所有并行变换和功能都可以通过代码生成过程进行跟踪。偶尔可能会丢失有关某些循环或变换的诊断信息。
jit()
的并行选项可以生成有关自动并行化修饰代码的变换的诊断信息。这个信息可以通过两种方式访问,第一种是通过设置环境变量 NUMBA_PARALLEL_DIAGNOSTICS
,第二种是通过调用 parallel_diagnostics()
,两种方法都给出相同的信息并打印至STDOUT
。诊断信息中的详细程度由 1 到 4 之间的整数参数控制,其中 1 表示最小,4 表示最多。例如:
@njit(parallel=True)
def test(x):
n = x.shape[0]
a = np.sin(x)
b = np.cos(a * a)
acc = 0
for i in prange(n - 2):
for j in prange(n - 1):
acc += b[i] + b[j + 1]
return acc
test(np.arange(10))
test.parallel_diagnostics(level=4)
生产:
================================================================================
======= Parallel Accelerator Optimizing: Function test, example.py (4) =======
================================================================================
Parallel loop listing for Function test, example.py (4)
--------------------------------------|loop #ID
@njit(parallel=True) |
def test(x): |
n = x.shape[0] |
a = np.sin(x)---------------------| #0
b = np.cos(a * a)-----------------| #1
acc = 0 |
for i in prange(n - 2):-----------| #3
for j in prange(n - 1):-------| #2
acc += b[i] + b[j + 1] |
return acc |
--------------------------------- Fusing loops ---------------------------------
Attempting fusion of parallel loops (combines loops with similar properties)...
Trying to fuse loops #0 and #1:
- fusion succeeded: parallel for-loop #1 is fused into for-loop #0.
Trying to fuse loops #0 and #3:
- fusion failed: loop dimension mismatched in axis 0\. slice(0, x_size0.1, 1)
!= slice(0, $40.4, 1)
----------------------------- Before Optimization ------------------------------
Parallel region 0:
+--0 (parallel)
+--1 (parallel)
Parallel region 1:
+--3 (parallel)
+--2 (parallel)
--------------------------------------------------------------------------------
------------------------------ After Optimization ------------------------------
Parallel region 0:
+--0 (parallel, fused with loop(s): 1)
Parallel region 1:
+--3 (parallel)
+--2 (serial)
Parallel region 0 (loop #0) had 1 loop(s) fused.
Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part
of the larger parallel loop (#3).
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
---------------------------Loop invariant code motion---------------------------
Instruction hoisting:
loop #0:
Failed to hoist the following:
dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99)
dependency: $0.6.11 = getattr(value=$0.5, attr=sin)
dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None)
dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
dependency: $0.10.20 = getattr(value=$0.9, attr=cos)
dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None)
loop #3:
Has the following hoisted:
$const58.3 = const(int, 1)
$58.4 = _n_23 - $const58.3
--------------------------------------------------------------------------------
为了帮助用户不熟悉使用并行选项时进行的转换,并帮助理解后续章节,提供了以下定义:
-
Loop fusion
循环融合是一种技术,其中具有等效边界的循环可以在某些条件下组合以产生具有较大主体的循环(旨在改善数据局部性)。
-
Loop serialization
当在另一个
prange
驱动的回路内存在任意数量的prange
驱动回路时,会发生回路串行化。在这种情况下,所有prange
循环的最外层并行执行,并且任何内部prange
循环(嵌套或其他)被视为基于标准range
的循环。实质上,嵌套并行性不会发生。 -
Loop invariant code motion
循环不变代码运动是一种优化技术,它分析循环以查找可以移动到循环体外的语句而不改变执行循环的结果,然后这些语句被“提升”出循环保存重复计算。
-
Allocation hoisting
分配提升是循环不变代码运动的一种特殊情况,由于一些常见的 NumPy 分配方法的设计,这是可能的。这个技术的解释最好由一个例子驱动:
@njit(parallel=True) def test(n): for i in prange(n): temp = np.zeros((50, 50)) # <--- Allocate a temporary array with np.zeros() for j in range(50): temp[j, j] = i # ...do something with temp
在内部,这被转换为大致如下:
@njit(parallel=True) def test(n): for i in prange(n): temp = np.empty((50, 50)) # <--- np.zeros() is rewritten as np.empty() temp[:] = 0 # <--- and then a zero initialisation for j in range(50): temp[j, j] = i # ...do something with temp
然后吊装后:
@njit(parallel=True) def test(n): temp = np.empty((50, 50)) # <--- allocation is hoisted as a loop invariant as `np.empty` is considered pure for i in prange(n): temp[:] = 0 # <--- this remains as assignment is a side effect for j in range(50): temp[j, j] = i # ...do something with temp
可以看出
np.zeros
分配被分成一个分配和一个赋值,然后分配从i
中的循环中提升,这产生了更高效的代码,因为分配只发生一次。
该报告分为以下几个部分:
-
Code annotation
这是第一部分,包含带有循环的源代码,循环具有标识和枚举的并行语义。源代码右侧的
loop #ID
列与已识别的并行循环对齐。从示例中,#0
是np.sin
,#1
是np.cos
,#2
和#3
是prange()
:Parallel loop listing for Function test, example.py (4) --------------------------------------|loop #ID @njit(parallel=True) | def test(x): | n = x.shape[0] | a = np.sin(x)---------------------| #0 b = np.cos(a * a)-----------------| #1 acc = 0 | for i in prange(n - 2):-----------| #3 for j in prange(n - 1):-------| #2 acc += b[i] + b[j + 1] | return acc |
值得注意的是,循环 ID 按它们被发现的顺序枚举,这不一定与源中存在的顺序相同。此外,还应注意,并行变换使用静态计数器进行循环 ID 索引。因此,由于使用相同的计数器进行对用户不可见的内部优化/变换,循环 ID 索引可能不会从 0 开始。
-
Fusing loops
本节介绍在融合发现的循环时所做的尝试,注意哪些成功哪些失败。在未融合的情况下,给出了一个原因(例如,依赖于其他数据)。从示例:
--------------------------------- Fusing loops --------------------------------- Attempting fusion of parallel loops (combines loops with similar properties)... Trying to fuse loops #0 and #1: - fusion succeeded: parallel for-loop #1 is fused into for-loop #0. Trying to fuse loops #0 and #3: - fusion failed: loop dimension mismatched in axis 0\. slice(0, x_size0.1, 1) != slice(0, $40.4, 1)
可以看出,环
#0
和#1
的融合被尝试并且这成功(两者都基于x
的相同尺寸)。在#0
和#1
成功融合后,尝试在#0
(现在包括融合的#1
环)和#3
之间进行融合。这种融合失败是因为存在环尺寸不匹配,#0
是尺寸x.shape
而#3
是尺寸x.shape[0] - 2
。 -
Before Optimization
本节显示了在进行任何优化之前代码中并行区域的结构,但是具有与其最终并行区域相关联的循环(这是在优化输出之前/之后直接进行比较)。如果存在不能融合的循环,则可能存在多个并行区域,在这种情况下,每个区域内的代码将并行执行,但每个并行区域将顺序运行。从示例:
Parallel region 0: +--0 (parallel) +--1 (parallel) Parallel region 1: +--3 (parallel) +--2 (parallel)
正如<cite>融合循环</cite>部分所提到的,代码中必然存在两个并行区域。第一个包含循环
#0
和#1
,第二个包含#3
和#2
,所有循环都标记为parallel
,因为尚未进行优化。 -
After Optimization
本节显示优化发生后代码中并行区域的结构。同样,平行区域用它们相应的循环枚举,但是记录融合或序列化的这个时间循环并给出摘要。从示例:
Parallel region 0: +--0 (parallel, fused with loop(s): 1) Parallel region 1: +--3 (parallel) +--2 (serial) Parallel region 0 (loop #0) had 1 loop(s) fused. Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part of the larger parallel loop (#3).
可以注意到,并行区域 0 包含循环
#0
,并且如<cite>定影循环</cite>部分所示,循环#1
融合到循环#0
中。还可以注意到,并行区域 1 包含循环#3
并且该循环#2
(内部prange()
)已被序列化以在循环体#3
中执行。 -
Loop invariant code motion
此部分显示优化发生后的每个循环:
- 未能提升的指示和失败的原因(依赖/不纯)。
- 悬挂的指示。
- 任何可能发生的分配吊装。
从示例:
Instruction hoisting: loop #0: Failed to hoist the following: dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99) dependency: $0.6.11 = getattr(value=$0.5, attr=sin) dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None) dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9 dependency: $0.10.20 = getattr(value=$0.9, attr=cos) dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None) loop #3: Has the following hoisted: $const58.3 = const(int, 1) $58.4 = _n_23 - $const58.3
首先要注意的是,此信息适用于高级用户,因为它指的是正在转换的函数的 Numba IR 。例如,示例源中的表达式
a * a
部分转换为 IR 中的表达式$arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
,这显然无法从loop #0
中提升,因为它不是循环不变的!而在loop #3
中,表达式$const58.3 = const(int, 1)
来自源b[j + 1]
,数字1
显然是一个常数,因此可以从循环中提升。
也可以看看