Spark Shuffle

shuffle 过程发生在不同 stage 之间
- 前一个 stage 的 ShuffleMapTask 进行 shuffle write,把数据存储在 blockManager 上,把数据位置元信息上报到 driver 的 mapOutTrack 中
- 后一个 stage 根据数据位置元信息,进行 shuffle read,拉取上个 stage 的输出数据
shuffle 操作必须要落盘,所以操作性能低
分布在多个节点的同一个 key,拉取到同一个节点上,进行聚合或 join 操作。
- 相同的 key 会写到本地磁盘,然后其他节点通过网络传输拉取各个节点磁盘上相同的 key
- 处理key过多,导致内存不够存放,进而溢写到磁盘文件中。
- 大量磁盘文件读写 IO 以及数据网络传输
不同 ShuffleWriter
- BypassMergeSortShuffleWriter
- SortShuffleWriter
- 聚合算子:边聚合边写入内存
- 普通算子:直接写内存
- UnsafeShuffleWriter
- 序列化器KryoSerializer
- 直接在 serialized binary data 上 sort 而不是 java objects,减少了 memory 的开销和 GC 的 overhead
触发 Shuffle 的操作
- repartition 相关
- repartition
- coalesce
- ByKey
- groupByKey
- reduceByKey
- combineByKey
- groupByKey 和 reduceByKey 的底层实现
- 大概实现逻辑
- 遇到新 key 执行 createCombiner
- 遇到已有 key 执行 mergeValue
- 对所有分区执行 mergeCombiners
- aggregateByKey
- join 相关
- cogroup
- join
[[Spark 调优]]
- 减少 shuffle 次数
- 必要时主动 shuffle 改变并行度
- 使用 treeReduce & treeAggregate 替换 reduce & aggregate
[[Spark 数据倾斜]]
- shuffle 默认使用 [[HashPartitioner]] 对数据进行分片,可能造成不同的 key 分配到一个 task 上
spark.default,parallelism指定默认并行度
网络回响
Spark Shuffle