10、Spark-Shuffle

🏷️ 365bet苹果app 📅 2026-01-30 02:54:10 👤 admin 👀 4089 ⭐ 310
10、Spark-Shuffle

本文来自拉钩教育

Spark 中 DAG 依赖可以划分成宽依赖和窄依赖。

宽依赖还有个名字,叫 Shuffle 依赖,也就是说宽依赖必然会发生 Shuffle 操作,Shuffle 也是划分 Stage 的依据。

窄依赖由于不需要发生 Shuffle,所有计算都是在分区所在节点完成,它类似于 MapReduce 中的 ChainMapper。

Spark Shuffle

很多算子都会引起 RDD 中的数据进行重分区,新的分区被创建,旧的分区被合并或者被打碎,在重分区的过程中,如果数据发生了跨节点移动,就被称为 Shuffle。

在 Spark 中,Shuffle 负责将 Map 端(这里的 Map 端可以理解为宽依赖的左侧)的处理的中间结果传输到 Reduce 端供 Reduce 端聚合(这里的 Reduce 端可以理解为宽依赖的右侧),它是 MapReduce 类型计算框架中最重要的概念,同时也是很消耗性能的步骤。

与 MapReduce 的 Sort-based Shuffle 不同,Spark 对 Shuffle 的实现方式有两种:Hash Shuffle 与 Sort-based Shuffle,这其实是一个优化的过程。在较老的版本中,Spark Shuffle 的方式可以通过 spark.shuffle.manager 配置项进行配置,而在最新的 Spark 版本中,已经去掉了该配置,统一称为 Sort-based Shuffle。

Hash Shuffle

在 Spark 1.6.3 之前, Hash Shuffle 都是 Spark Shuffle 的解决方案之一。 Shuffle 的过程一般分为两个部分:Shuffle Write 和 Shuffle Fetch,前者是 Map 任务划分分区、输出中间结果,而后者则是 Reduce 任务获取到的这些中间结果。Hash Shuffle 的过程如下图所示:

在图中,Shuffle Write 发生在一个节点上,该节点用来执行 Shuffle 任务的 CPU 核数为 2,每个核可以同时执行两个任务,每个任务输出的分区数与 Reducer(这里的 Reducer 指的是 Reduce 端的 Executor)数相同,即为 3,每个分区都有一个缓冲区(bucket)用来接收结果,每个缓冲区的大小由配置 spark.shuffle.file.buffer.kb 决定。这样每个缓冲区写满后,就会输出到一个文件段(filesegment),而 Reducer 就会去相应的节点拉取文件。这样的实现很简单,但是问题也很明显。主要有两个:

生成的中间结果文件数太大。理论上,每个 Shuffle 任务输出会产生 R 个文件( R为Reducer 的个数),而 Shuffle 任务的个数往往由 Map 任务个数 M 决定,所以总共会生成 M * R 个中间结果文件,而往往在一个作业中 M 和 R 都是很大的数字,在大型作业中,经常会出现文件句柄数突破操作系统限制。

缓冲区占用内存空间过大。单节点在执行 Shuffle 任务时缓存区大小消耗为 m * R * spark.shuffle.file.buffer.kb,m 为该节点运行的 Shuffle 任务数,如果一个核可以执行一个任务,m 就与 CPU 核数相等。这对于动辄有 32、64 物理核的服务器来说,是比不小的内存开销。

consolidation的优化(减少中间文件 )

为了解决第一个问题, Spark 推出过 File Consolidation 机制,旨在通过共用输出文件以降低文件数,如下图所示:

每当 Shuffle 任务输出时,同一个 CPU 核心处理的 Map 任务的中间结果会输出到同分区的一个文件中,然后 Reducer 只需一次性将整个文件拿到即可。这样,Shuffle 产生的文件数为 C(CPU 核数)* R。 Spark 的 FileConsolidation 机制默认开启,可以通过 spark.shuffle.consolidateFiles 配置项进行配置。

Sort-based Shuffle

每个 Map 任务会最后只会输出两个文件(其中一个是索引文件),其中间过程采用的是与 MapReduce 一样的归并排序,但是会用索引文件记录每个分区的偏移量,输出完成后,Reducer 会根据索引文件得到属于自己的分区,在这种情况下,Shuffle 产生的中间结果文件数为 2 * M(M 为 Map 任务数)。

总结一下

类型

文件数量

Hash Shuffle

一个 Map 生成多个文件(由 Reduce 数量决定),总数量为:Map 数 * Reduce 数

Hash Shuffle + consolidation

一个 CPU 生成多个文件(由 Reduce 数量决定),总数量为: CPU 核数 * Reduce 数

Sort-based Shuffle

每个 Map 任务会最后只会输出两个文件(其中一个是索引文件),总数量为: 2 * Map 数

相关推荐 ✨

约彩365官方下载安装 云顶之弈大师晋级宗师规则
nowgoal365live score AKG崛起,未来抗衰黑马你了解了吗?
约彩365官方下载安装 認識鉋刀用木~台灣鉋刀使用樹材
nowgoal365live score 《媚肉之香》1 2补丁说明和使用方法
365bet苹果app 问保时捷981价格大概多少钱
nowgoal365live score 横店影视城

横店影视城

📅 01-07 👀 393