Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MilliSort and MilliQuery: Large-Scale Data-Intensive Computing in Milliseconds #6

Open
linxuyalun opened this issue Apr 17, 2021 · 8 comments
Labels
data Big data computing framework done Finish reading novel This paper is novel and cool

Comments

@linxuyalun
Copy link
Owner

https://www.usenix.org/conference/nsdi21/presentation/li-yilong

@linxuyalun linxuyalun added todo todo novel This paper is novel and cool data Big data computing framework labels Apr 17, 2021
@linxuyalun
Copy link
Owner Author

linxuyalun commented Apr 17, 2021

大数据处理框架

Datacenter computing 的好处在于运行大规模运算的应用,利用数百或数千台机器共同完成一项任务。使用 MapReduce 和 Spark 等框架,开发人员可以轻松创建各种领域的应用,如大规模数据分析。大规模应用的执行时间都比较长:几秒或几分钟。这是必要的,因为涉及分配和协调集群。同样,MapReduce 和 Spark 等框架传统上都是在非常大的数据块上运行,以便摊销高网络延迟。因此,它们无法应用于实时任务。对于这类框架而言,实时查询必须返回预先计算的结果,例如昨天批处理运行中产生的结果。这意味着查询必须事先仔细规划,不能轻易支持临时查询。

诸如 Flink 或 Spark Streaming 这类流式框架对传入的数据进行实时操作,它们通过将新数据传输到提前涉及好的查询或转换语句中。为了支持实时查询,每个事件引发的计算规模相当有限。

近年来,serverless 平台,如AWS Lambda、Azure Cloud Functions 和 Google Cloud Functions 已经使得在数据中心中运行短时任务(小到几百毫秒)成为可能。然而,这些环境中的执行单位是单个函数调用。例如,官方不支持 lambdas 之间的直接通信,现有的工作方案具有高延迟和低带宽。

@linxuyalun
Copy link
Owner Author

linxuyalun commented Apr 17, 2021

Goals

这篇论文从两个方面探讨了扩展 serverless 计算的可能性:第一,进一步缩短时间尺度;第二,重新引入规模,使大量服务器可以一起工作。论文使用术语 flash burst 来描述一种寿命很短却能利用大量服务器的计算。flash burst 提供了实时分析大量数据的潜力,这意味着可以创建新的应用程序,在大型数据集上实时执行自定义查询,而无需提前数小时或数天预测查询

论文着手回答以下三个问题:

  1. 什么是有意义的 flash burst 可以运行的最小时间尺度?
  2. 在这样的时间尺度下,可以利用的最大数量的服务器是多少?
  3. 当前系统的哪些方面限制了 flash burst 的 duration 和规模?

论文重点讨论问题的一个方面:可能用于 flash burst 的核心算法是否能在极小的时间尺度上高效运行。不讨论加载应用程序和数据所需的时间,也不讨论如何在面对短时计算的情况下实现高资源利用率;这些问题留待今后的工作。

假设小到1毫秒的 timescale 是可能的,所以将其作为最初的目标

论文实现了两个应用,它们描述了期望在 flash burst 中常见的计算和通信模式。

  1. 第一个应用是 MilliSort:给定数据中心的无限资源,在一毫秒内可以排序的最大数量是多少?
  2. 第二个应用程序是 MilliQuery,它由 Google BigQuery 的教程中的三个代表性 SQL 查询组成。这些查询的范围从简单的 scan-filter-aggregate 查询到需要多次 shuffle 的分布式连接。与 MilliSort 一样,目标是了解在一毫秒左右的时间尺度下,可以分析多少数据,以及可以利用多少服务器。

简单的说,这篇论文的主要贡献首先是实现了上述两个 example apps 帮助理解 flash burst 这个概念,然后根据实验结果从 Feasibility,Scaling 和 Limiting Factors 三个角度去分析。

@linxuyalun
Copy link
Owner Author

linxuyalun commented Apr 17, 2021

那么,一毫秒能做什么?

  • 用 8 核排序 40,000 个 10 byte 的 key;
  • 内存到内存顺序拷贝 5 MB 的数据;
  • 使用 40 Gbps 网卡接收/发送 5 MB 数据;
  • 通过 kernel bypass(绕开内核),用一个核进行 300 次 back-to-back(直接传输) RPC 调用;
  • 一个核,接收/发送 2 到 5 千个 small message;
  • 一个核发生 10,000 次 back-to-back L3 Cache miss。

可以看出一个最重要的限制是,每个服务器只能操作少量的数据(几 MB 的数量级)。鉴于服务器数量众多,每台服务器的数据量较小,数据必须在整个 flash burst 期间保持均匀分布。哪怕是一小部分数据积累在一台服务器上,进入该服务器的网络链路就会成为瓶颈

@linxuyalun
Copy link
Owner Author

flash bursts 面临的挑战

Limited data per server

如上个 section 所述。

Coordination cost

考虑到每台服务器只能访问少量的数据,flash burst 的整体规模将受到可以利用的服务器数量的限制。但是,由于时间尺度小,很难协调非常多的服务器,在某些规模下,一毫秒的时间甚至不足以通知所有服务器开始工作。因此,coordination cost 在 flash burst 中起着根本性的作用,因为它们限制了规模。"coordination"包括让所有的服务器参与进来,确定每个服务器的工作分配,以及对算法的各个阶段进行排序等活动。现有的大规模应用如 Spark,每台服务器存储的数据量要大得多,而且运行时间也较长,这使得 coordination cost 不那么重要。

Multiple communication costs

对于许多现有的大规模应用,唯一重要的通信成本是网络带宽。然而对于 flash burst 来说,三种不同的成本可能变得很重要。除了带宽(在发送大数据块时很重要)和延迟(在发送小数据块时很重要)之外,第三种成本在 flash burst 中扮演着重要的角色: per-message overhead(发送和接收短消息所需的CPU时间)。当一个服务器有一系列可以并发发送到其他服务器的小请求时,per-message overhead 就会发挥作用;它限制了一系列消息的发出速度。per-message overhead 在 flash burst 中特别重要,因为它们支配着 Group communication 的成本(下面讨论),而 Group communication 又支配着 Coordination 的成本。

Group communication

如果集群要紧密合作,服务器需要频繁地、小块地交换数据。但是,在 flash burst 中,如果有几百台或几千台服务器在很小的时间范围内工作,那么每台服务器直接与其他所有服务器进行通信是不现实的。例如,如果一台服务器通过向其他 1000 台服务器分别发送一个小消息来广播数据,由于 per-message overhead,广播将消耗相当一部分时间。

因此,Group communication 在 flash burst 中起着重要作用。在 Group communication 中,一个集群中的许多或所有服务器同时传输数据,以执行整个集群的目标。四种机制在 MilliSort 和 MilliQuery 中发挥作用。

  • Broadcast:单台 server 将数据分发到 group 中的每台 server ;
  • Gather:单台 server 必须从 group 中的每台服务器收集不同的数据;
  • All-gather:Group 下的每台 server 都 Gather 了;
  • Shuffle:最初每台 server 都为 group 中的其他每台 server 存储一个单独的数据项;shuffle 阶段必须将所有数据传送到预定 server。

Group communication 提供了两个好处。首先,它利用多台 server 同时运行来加快通信速度;例如,通过树形结构分发消息,可以用几台 server 更快地完成一次广播。其次,它有时可以用几个较大的消息来代替许多小消息;这就减少了每个消息开销的影响。例如,All-gather 的一种实现对 M 台 server 来说,可以只需要 MlogM 消息,而如果每台 server 与其他 server 独立通信,则需要 M方的消息。

@linxuyalun linxuyalun added wip Read in progress and removed todo todo labels Apr 17, 2021
@linxuyalun
Copy link
Owner Author

linxuyalun commented Apr 17, 2021

MilliSort and MilliQuery Applications

目前还没有 flash burst 应用(鉴于没有能够支持它们的基础设施)。论文实现了两个小应用,试图去利用这些 demo 分析 flash burst 应用中的 dwarfs。

一种通信模式,在各种大规模应用中被普遍使用,并占了它们性能的大部分。这些模式被称为 "dwarfs"。例如,矩阵运算、排序和统计计算是大数据和 AI 工作的 dwarfs。

MilliSort

排序在许多分布式计算中扮演着重要的角色。例如,排序可以作为一个数据预处理步骤,支持高效的范围查询,改善图分区的数据局部性,以及执行负载平衡。排序也是非常具有挑战性的,因为它需要密集和不可预测的通信(任何记录都有可能在任何服务器上结束)。这将给算法和底层基础设施带来挑战。

对于 MilliSort 来说,目标是在一毫秒左右的时间内,使用数据中心内任意数量的服务器,对尽可能多的小记录进行排序。每条记录包含 100 个 bytes,由一个 10bytes 的 key 和一个 90bytes 的 value 组成。在开始 benchmark 之前,MilliSort 应用程序被预先加载,未排序的记录被_均匀_地分布在DRAM中的可用机器中。每台服务器上的数据都是按照下述结构,将所有 key 放在一个内存块中,所有 value 放在另一个内存块中。完成后,必须将数据在相同的服务器上重新分配,进行排序,在排序结束时,每个服务器上的数据被结构化为两个内存块,一个内存块包含按排序顺序排列的键,另一个内存块包含与其键相同顺序的值。

MilliQuery

MilliQuery 来自 Google 的 BigQuery 文档的三个 SQL 查询的集合,将这些查询统称为 MilliQuery,许多 flash burst 应用将执行数据分析,以提供临时查询的实时结果;MilliQuery 的目标是捕捉这些应用中常见的 dwarfs。具体如下三种:

  • Q1:按语言统计维基百科上的文章浏览量(最多有几百种不同的语言)。
  • Q2:按照对维基百科文章的编辑次数,找出前 10 个 IP 地址。
  • Q3:对于作者和编程语言的每一个组合,将该语言在作者是贡献者的任何 repo 中的所有字节相加,返回前100个 作者-语言 pair。

image

@linxuyalun
Copy link
Owner Author

linxuyalun commented Apr 17, 2021

The MilliSort algorithm

⚠️ 这部分和论文本身概念关系较小,简单了解即可

论文解释了自己的 MilliSort Example 的实现算法,看起来这个似乎和 flash burst 没有太大关系,实际上是为了更好的说明 flash burst 面临的各种通信上的开销以及怎么去更好的减少它们的代价。同样的,它介绍了一种新颖的 hierarchical 式的方法进行数据分区,从而达到高效的 coordination。

大多数分布式排序算法都使用分区方法,MilliSort 也是。首先决定最终每个 server 上 key 的范围,对数据进行分区;然后在 server 之间对记录进行 shuffle,以实现所选分区。这种方法通过在 shuffle 阶段只传输每条记录一次优化带宽的使用。Partitioning 的意义在于优化了带宽的使用,而网络带宽历来是分布式排序中最稀缺的资源。其他方法,比如 multi-stage merge sorts 的方法,需要在网络层面上多次传输数据,因此它们比分区方法慢。

更准确的说,MilliSort 是分布式桶排序的变种,它由四个阶段组成:

  1. Local sort:每个 server 对初识数据进行本地排序;
  2. Partition bucket boundaries:所有 server 共同确定排序后最终在每个 server 上的 key 的范围;
  3. Shuffle:每个 server 将 key 和 value 传输到对应的目标 server 上;
  4. Local rearrangement:在 shuffle 阶段到达每个 server 的数据必须重新排列成两个完全排序的数组,一个是 key,一个是 value(如前面 section 介绍,它们分布在不同的内存块上)。

接下来将从 partitioning 阶段开始介绍,从算法角度来看,这是最复杂也是最有趣的阶段。

Histogram sort

直方图排序是基于分区的排序算法,并被广泛使用,它通过对现有 partition 进行迭代细化,计算出最终的 key 的范围,直到 key均匀分布在服务器上。直方图排序的典型工作流程如下。

一开始,一个中心 server 挑选 M-1 个 splitter,将 key 划分为 M 个桶,并将它们广播给其他 server。然后,每个 server 计算出其 key 在桶中的本地直方图,并将其发回给中心服务器。最后,中央服务器通过将本地直方图相加计算出一个全局直方图,并调整 splitter 以减少不平衡。重复进行直方图的绘制和细化过程,直到实现均匀的分割。

然而,直方图排序对于MilliSort来说并不可取,因为它需要多次迭代才能收敛,而且每次迭代都会产生大量的消息延迟。为了避免中心服务器的过载,直方图排序经常使用组通信来 broadcast splitter,并在树结构中减少本地直方图。因此,每次迭代都会产生2log(M)的消息延迟。在论文的环境中,对于 100 台服务器,broadcast 和 gather 的综合成本至少为 50μs。仅仅 10 次迭代,仅消息延迟就会占用我们 1ms 时间预算的一半。

Sample sort partitioning

MilliSort 采用了不同的分区方法,选择了较多的初始值,目的是在一次迭代中估计出分布情况。MilliSort 的分区算法是基于有规律的抽样排序,基本思想是从起始数据中选择许多 key,用这些 key 来估计 key 的分布,并根据估计的分布选择分区边界。下图显示了基本思路。

image

在对其本地数据进行排序后,每台 server 在排序记录中以等间距的间隔对其 key 进行采样;将这些采样称为 pivot。所有 server 的 pivot 都会被收集并排序(更多细节见下文)。最后从排序后的 pivot 中选择 splliter 。如果有 M 台机器参与排序,则选择 M-1 个 splitter,将排序后的 pivot 分成 M 个大小相等的组。splitter 决定了在 shuffle 阶段 server 之间如何划分数据:server i 最终将持有所有数据大于或等于第 i 个 splitter 而小于第 i+1 个 splitter 的数据。

由于这种方法使用的是抽样,所以不能保证每个 server 在排序结束时最终得到的记录数量完全相同。如果总共有 N 条记录分给 M 台 server,而每台 server 选择 sM 个 pivot。对于MilliSort,使用s=1(每台机器选择 M 个 pivot),采用这种方法,需要排序的 pivot 总数为sM^2。这意味着随着 server 数量的增加,即使所有 server 都分担了工作,分区也会花费越来越多的时间。鉴于排序的时间有限,分区成本将限制可以利用的机器数量。

Recursive partitioning

执行 partitioning 的一种方法是是将所有的 pivot 聚集在一个 coordinator server 上,在该 server 上进行本地排序,然后将 splitter 广播回所有的 server。但是,这种方法对于 MilliSor t来说效率太低。如果有 300 台机器参与分拣,就会有 9 万个 pivots(正如我们上个 section 介绍的那样);同时在前面一毫秒可以做什么那个 section 中介绍过,单台 server 在一毫秒内只能排序约 4 万个 key,所以光排序就需要 2 毫秒以上。在单个 server 上接收所有的 pivot 的开销也是个问题。因此,毫秒级的排序需要以分布式的方式进行分区

MilliSort 采用递归的方式进行分区:使用较小的 MilliSort 实例以分布式方式对 pivot 进行排序,如下图所示。

image

集群中的一个子集,称为 pivot sorters,对 pivot 进行排序并选择 splitter;其他 server 中的都被分配给一台 pivot sorters。为了开始 sorting,每个 pivot sorters 从被分配的那些 server 那里收集 pivot。因为从每台 server 到达的 pivot 已经进行了排序,因此 pivot sorters 可以对到达的数据使用归并排序,以产生一个由它负责的所有 pivot 的排序列表。然后,每个 pivot sorters 对其 pivot 进行采样,以选择数量较少的第 2 级 pivot;第 2 级 pivot 被传递给 coordinator,使其对它们进行 sorting,并产生一组第 2 级 spliiter。Coordinator 将第2级 spillter 广播回 pivot sorters,然后执行 shuffle,将 pivot 按排序顺序在 pivot sorters 之间重新分配。

此时,pivot 已被排序,必须选择 splitter(即必须在所有的 pivot sorters 中选择第 sM 个 pivot)。我们希望每个 pivot sorters 都能独立地从其 pivot 中选择 splitter,但要做到这一点,pivot sorters 必须知道它的等级(即存储在其他 server 的 pivot 有多少比它存储的 pivot 小)。由于 pivot 并不是均匀地分布在各个 pivot sorters 上,因此排名并不是一目了然的。解决方法是在 pivot 排序的 shuffle 阶段分配等级信息。当一个 pivot sorters 在 shuffle 过程中向另一个 pivot sorters 发送一组 pivot 时,它包括了该组的本地排名。每个 pivot sorters 都可以通过将其收到的所有 shuffle 消息中的本地排名相加来确定其排名。一旦一个 pivot sorter 知道它的等级,它就可以识别它所存储的 splitter。

最后,一旦确定了 splitter,就必须将它们传播给所有参与排序的 server 。一种方法是让每个 pivot sorter 将其 spliiter 广播给所有的 M 台 server;但是,这样做的成本很高,因为会产生大量的消息。相反,MilliSort 使用两步法来分发 splitter。第一步,使用 all-gather 操作在 pivot sorters 之间交换 spliiter,使每个 pivot sorter 拥有所有的 M-1 个 splitter。然后,每个 pivot sorters 将完整的 pivot 广播给所有分配给它的 server。

如果 server 的数量非常多,上述的2级方法仍然会花费太长的时间。如果是这种情况,可以在分区中使用附加级别。例如 3 级,以此类推。用 r 来表示递归排序每一级的 reduction 因子,对于 M 个总机器来说,将有 M/r 个 pivot sorter,M/r^2 个二级 pivot sorter,以此类推。对于 MilliSort 的实现,论文使用的 reduction 因子为10。

@linxuyalun
Copy link
Owner Author

linxuyalun commented Apr 17, 2021

性能评测

最后就是相应的性能测试了,当然它的实验环境本身很好,这也是它前面算法的优化手段并没有导致更差性能的原因,因为网络足够快:

image

论文评估上面提到的两个 example 从而尝试回答几个问题:

  • 在一毫秒(或十毫秒)内可以处理多少数据,各自可以有效地利用多少台服务器?
  • 如果增加或减少时间预算,应用行为会发生怎样的变化?
  • 哪些因素限制了应用的性能和可扩展性,这些应用对底层 infra 造成了哪些压力?

整体性能如何?

image

1 毫秒的时间预算对大多数应用来说更具挑战性。只有 MilliQuery Q1 可以使用所有的 server,其他应用的 server 数量在60-140个之间。在 1 毫秒内处理的数据量在不同的应用中变化很大,从 MilliQuery Q3 的最低 34K到 Q1 的最高 48M。

image

图(a)显示,除了 Q1 之外,所有的 benchmark 都随着时间预算从 1 毫秒增加到 2 毫秒而呈现出二次方或更好的增长。图(b)显示了最佳集群大小作为时间预算的函数。除了 Q1 之外,其他都表现出几乎线性的扩展。图(c)显示了随着时间预算的增加,每台 server 数据集大小的缩放。在固定的初始开销之后,除了Q3之外,其他所有的都至少线性地缩放。

什么限制了 scalability?

从下表可以看出,Partitioning 和 Shuffle 是主要的开销:

image

如果集群规模翻倍,这两项所花费的时间也会翻倍,尽管每台 server 仍然处理大致相同数量的数据。partitioning 和 shuffle 消耗的时间占总运行时间的比例从 63% 增加到 77%。同时,在两种配置中,其他阶段的综合成本几乎保持不变。

Shuffle 的效率

Shuffle 对可扩展性影响很大,特别是在 1 ms 的时间尺度上。Q1,没有 shuffle,可以利用更多的 server,处理多得多的数据,而Q3,有三个 shuffle,可扩展性最差。MilliSort 和 Q2 各使用一次 shuffle,介于两者之间。在上表中,当 server 数量翻倍,同时固定每台 server 的数据量,85% 的时间增加来自于额外的 shffule 成本。

为了更好地了解 shuffle 成本,我们运行了一个独立的 shuffle benchmark,其中一组 server 中的每一台都向其他 server 发送固定大小的消息。下图是以每台 server 的吞吐量为单位的 shuffle 性能图。每台 server 的理想吞吐量是25 Gbps。

可以观察到两个不同的潜在原因,导致更多的 server 有更高的 shuffle 时间。首先,随着消息大小的减小,由于每条消息的 overheads,效率会下降。在 120 台 server 和 0.96M 总数据的情况下,shuffle 的平均消息大小约为 6.7KB,仍能提供良好的吞吐量。但在 240 台 server 和 1.92M 数据量的情况下,shuffle 的平均消息大小下降到 3.3KB 左右,导致每台 server 的吞吐量不到10Gbps。

其次,即使是大消息,每台 server 的吞吐量也会随着集群规模的增加而下降。这是因为集群网络使用的是 flow-consistent 的负载均衡,而不是数据包级的负载均衡。随着大量的主动传输,路径在链路使用上会发生冲突,导致这些链路的拥塞和其他链路的使用率不足。随着集群规模的增大,shuffle 会消耗更多的核心带宽,这使得拥塞更容易发生,而 shuffle 方法无法完全补偿。因此,shuffle 无法利用网络提供的全部 bi-section 带宽。使用更细化的网内负载均衡技术,如可能有助于在未来消除这一瓶颈。

@linxuyalun
Copy link
Owner Author

linxuyalun commented Apr 18, 2021

洞见

细粒度数据分布很重要

细粒度分布数据很重要,可以让人们利用更多的 CPU 和网络带宽来加速计算和通信。因此,未来的分布式数据并行系统将需要针对每台服务器较小的数据进行大尺度的架构优化。

高效的 Group communication 很重要

即使是简单的 dwarf 内部也有复杂的通信模式;除了 MilliQuery Q1 之外,所有的 benchmark 都严重依赖 group 通信。即使经过精心优化,在 MilliSort、MilliQuery Q2 和 MilliQuery Q3 的最佳配置中,它们分别占到了整体运行时间的 50%~60%、35%~40% 和 50%~65%。

per-message overhead 是关键的

众所周知,网络带宽和延迟对于大规模系统的性能影响很大,对于 flash burst 来说,它们仍然很重要。flash burst 不同于传统的大规模系统, per-message overhead 的重要性至少与传统指标一样重要。这是因为 group communication 在_大集群规模的小时间尺度下运行时_,往往会产生许多 small message。

Coordination 必须是分层结构的

在 HPC 社区中,通信必须采用分层结构来处理大群集规模。因此,group communication 通常以树状或 hypercube topology 结构实现,因此每台机器只与其少量的 server 进行通信。这有助于减轻 per-message overhead,并利用更多的聚合网络带宽。这个原则也适用于 Coordination。

Low-latency shuffle 很难

在所有的 group communication 中,Shuffle 是 flash burst 中最具挑战性的。这有几个原因。首先,由于 shuffle 需要每个 server 发送许多 small message,因此 per-message overhead 变得至关重要。Shuffle 的分层方法一般不实用,因为它会使网络带宽利用率增加一倍。其次,Shuffle 需要使用每台 server 的全部网络带宽以获得最佳性能,但许多网络并不提供全部的 bisection bandwidth,尤其是在大规模的情况下。即使在提供全 bisection bandwidth 的情况下,由于网络本身(因为路由)或应用中的对分匹配不完善,也会出现短暂的拥塞。

无限扩展的极限

如果在固定数据集大小的同时增加机器数量,效率就会不可避免地下降;这最终会使我们无法利用更多的 server 来加快工作速度。虽然一个并行程序的理论速度最终会受到其串行部分的限制(阿姆达尔定律)。但在 flash burst 的场景下,还有两个因素导致了效率的下降。首先,coordination 成本上升。这主要是由于 per-message overhead(例如,更高的 shuffle 成本)。Coordination 算法也需要更长的运行时间(例如,MilliSort 的 partition 时间随着 server 的增加而线性增加)。其次,当 server 较多而每台 server 的数据较少时,落后者效应更为显著(例如,在MilliQuery Q1中,当从 40 台 server 扩展到 280 台时,整体效率下降了约50%)。

@linxuyalun linxuyalun added done Finish reading and removed wip Read in progress labels Apr 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Big data computing framework done Finish reading novel This paper is novel and cool
Projects
None yet
Development

No branches or pull requests

1 participant