Pair RDD

数据分区

  • 直行聚合或分组操作时,可以给定 saprk 的分区数

RDD 持久化

持久化指将数据进行保存,避免数据丢失。

RDD 持久化并非将数据落盘,而是缓存数据,供后续计算使用。

Cache()

  • 底层是 persist(),没有指定参数,默认 MEMORY_ONLY

persist()

  • 使用指定的方式进行持久化

  • StorageLevel.

    • MEMORY_ONLY

      • 内存优先

      • RDD 分区空间不够,旧的分区会直接删除

    • MEMORY_AND_DISK_SER

      • 优先内存,内存不足到磁盘。

      • 节省重新计算的开销

    • MEMORY_ONLY_SER

      • 在内存存放序列化后的数据

        • 序列化存储能减少内存开销,反序列化会增大 cpu 开销
    • DISK_ONLY

  • SER

    • 序列化保存

Checkpoint

  • 将 RDD 中间结果二进制形式写入磁盘

  • 使用

    • sc.setCheckpointDir(“hdfs://hadoop102:9820/output/a”)

    • rdd.checkpoint()

    • 手动释放rddx.unpersist(true)

Question

  • RDD 持久化方式? #card

    • Cache()

    • persist()

  • memory_only 如果内存存储不了,会怎么操作? #card

    • 利用 [[LRU]] 的缓存策略把最老的分区从内存中移除

    • 下一次使用被移除的分区需要重新计算


RDD

共享内存模型

  • 只读的记录分区的集合

依赖关系

  • 窄依赖 narrow dependency

    • OneToOneDependency

    • 父 RDD 每个分区只被子 RDD 的一个分区所使用

    • 不需要 shuffle

    • map,union

  • 宽依赖 wide dependency/shuffle

    • 父 RDD 的每个分区可能被多个子 RDD分区所使用,会有 shuffle 产生

    • groupByKey

Partitioner 分区器

  • 定义如何分布数据

    • 一个 RDD 分成多少个分区,每个分区数据量多发,从而决定每个 Task 将处理哪些数据
  • 可使用分区器

    • [[HashPartitioner]] 给定的 key,计算 hashCode,对分区个数取余

    • [[RangePartitioner]] 尽量保证每个分区中的数据量均匀,且分区与分区之间是有序的。

      • rangeBounds
    • 自定义分区器

Question

  • RDD是弹性数据集,“弹性”体现在哪里呢?

    • 存储弹性

      • spark 计算产生的中间结果会保存在内存中,如果内存不足会自动存储在磁盘
    • 容错弹性

      • 计算过程中如果出错会自动重试

        • task 失败会重试

        • stage 失败会重试失败的分片

    • 计算弹性

      • 如果计算过程中数据丢失,会根据 RDD 的依赖关系重新计算得到数据
    • 分区弹性

      • RDD 会根据文件大小动态分区
  • 你觉得RDD有哪些缺陷?

    • 惰性计算,中间数据默认不保存,每次操作都会对数据集重复计算,某些计算量比较大的操作可能会影响系统的运行效率。
  • RDD分区和数据块有啥联系?


Spark API

SQL

  • Hive

  • 执行流程

+ SparkSqlParser

  + AST 语法树

    + 优化理论

+ Analyzer

  + 数据源绑定 & 字段类型确定

+ Optimizer

  + RBO 优化

+ SparkPlanner

  + 执行计划 Join 选择策略

Spark Streaming

MLlib

GraphX


Spark 内存模块

运行在 JVM 体系上,内存模型基于 Java 虚拟机

堆、栈、静态代码块和全局空间

Executor

  • 申请内存包括下面两部分 M1+M2

+ OnHeap 堆内内存 spark.executor.memory

+ OffHeap 堆外内存 spark.yarn.executor.memoryOverhead

  + spark.memory.offHeap.size
  • Task Memory Manager

    • E 的线程共享 JVM 资源,没有强隔离

内存划分

  • Storage:RDD缓存、Broadcast 数据空间

  • Execution:Shuffle 过程使用的内存

    • 第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;

      • JVM
    • 第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;

      • 通过 spark.shuffle.memoryFraction 控制大小
    • 第三块是让RDD持久化时使用,默认占Executor总内存的60%。

      • persisit() 或 cache()

      • 通过 spark.storage.memoryFraction 控制大小

      • 超过限制,旧分区会被移除内存

  • Other:用户定义的数据结构、Spark 内部元数据


SparkSession

Spark 2.0 引入概念,为用户提供统一的切入点来使用 Spark

[[pyspark]] 例子

1
2
3
4
5
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Word Count") \
.config(conf=c) \
.getOrCreate()

Scala 例子

  • 创建

    • val sparkSession = SparkSession.builder().appName(appName).enableHiveSupport().getOrCreate()
  • sql

    • sparkSession.sql(sqlStr)
  • HiveContext

    • spark sql 支持 hive 读写

    • new HiveContext(SparkSession)

  • SparkContext

    • sparkSession.sparkContext

Spark/Broadcast

sc.Broadcast(rdd.collectAsMap())

算子函数使用外部变量,默认情况 Spark 会将该变量复制多个副本,通过网络传输到 task 中,每个 task 都有一个变量副本。

  • 如果变量本身比较大,会占用增大网络中传输的性能开销,以及在各个节点的 Executor 占用过多内存导致频繁 GC。

  • 通过 Broadcast 的变量只在 Executor 内存中保留一份。

    • Executor 会有对应的 BlockManager,BlockManager 负责管理 Executor 对应的内存和磁盘上的数据。

    • 需要使用广播变量时,先尝试从 BlockManager 获取。如果失败, BlockManager 会从 Driver 或者 其他节点的 BlockManager 拉取变量副本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

//如果是变量是 hdfs 文件,先 collect
sc.broadcast(link2NodeMap.collectAsMap())

repartition 和 coalesce 对比

coalesce

repartition

Spark中repartition和coalesce 相同点 :-> 都是调整分区的方法
Spark中repartition和coalesce 区别 :-> repartition 默认有 shuffle 操作,coalesce 使用 hash paritioner 重新 shuffle 数据
什么情况使用 coalesce 调整分区 :-> filter 之后收缩分区
card-last-score:: 5
card-repeats:: 1
card-next-schedule:: 2022-10-22T02:05:11.083Z
card-last-interval:: 4

  • 为什么 :-> coalesce 不需要 shuffle

spark-submit

spark-submit --queue root.queue_name --executor-cores 2 --num-executors 400

num-executors:多少个 Executor 节点来执行

executor-memory

executor-cores:executor CPU 核心数

  • core 同一时间执行一个 task

  • 同一个 executor 中的 core 共享 executor-memory

    • 如果任务占用内存比较多,调小 cores 数,可以使用更多内存

driver-memory

  • collect 算子将 RDD 拉取到 Driver 处理需要避免 OOM

spark.yarn.executor.memoryOverhead executor 额外预留的内存

spark.default.parallelism

  • 每个 stage 默认 task 数量 500-1000

  • 设置该参数为num-executors * executor-cores的2~3倍较为合适

spark.dynamicAllocation.minExecutors 以及 spark.dynamicAllocation.maxExecutors

  • 运行时动态分配 core 数

spark.driver.maxResultSize设置 Executor 端发回数据量

降低 spark.memory.fraction