RDD

共享内存模型

  • 只读的记录分区的集合

依赖关系

  • 窄依赖 narrow dependency
    • OneToOneDependency
    • 父 RDD 每个分区只被子 RDD 的一个分区所使用
    • 不需要 shuffle
    • map,union
  • 宽依赖 wide dependency/shuffle
    • 父 RDD 的每个分区可能被多个子 RDD分区所使用,会有 shuffle 产生
    • groupByKey

Partitioner 分区器

  • 定义如何分布数据
    • 一个 RDD 分成多少个分区,每个分区数据量多发,从而决定每个 Task 将处理哪些数据
  • 可使用分区器
    • [[HashPartitioner]] 给定的 key,计算 hashCode,对分区个数取余
    • [[RangePartitioner]] 尽量保证每个分区中的数据量均匀,且分区与分区之间是有序的。
      • rangeBounds
    • 自定义分区器

Question

  • RDD是弹性数据集,“弹性”体现在哪里呢?
    • 存储弹性
      • spark 计算产生的中间结果会保存在内存中,如果内存不足会自动存储在磁盘
    • 容错弹性
      • 计算过程中如果出错会自动重试
        • task 失败会重试
        • stage 失败会重试失败的分片
    • 计算弹性
      • 如果计算过程中数据丢失,会根据 RDD 的依赖关系重新计算得到数据
    • 分区弹性
      • RDD 会根据文件大小动态分区
  • 你觉得RDD有哪些缺陷?
    • 惰性计算,中间数据默认不保存,每次操作都会对数据集重复计算,某些计算量比较大的操作可能会影响系统的运行效率。
  • RDD分区和数据块有啥联系?

RDD 持久化

持久化指将数据进行保存,避免数据丢失。

RDD 持久化并非将数据落盘,而是缓存数据,供后续计算使用。

Cache()

  • 底层是 persist(),没有指定参数,默认 MEMORY_ONLY

persist()

  • 使用指定的方式进行持久化
  • StorageLevel.
    • MEMORY_ONLY
      • 内存优先
      • RDD 分区空间不够,旧的分区会直接删除
    • MEMORY_AND_DISK_SER
      • 优先内存,内存不足到磁盘。
      • 节省重新计算的开销
    • MEMORY_ONLY_SER
      • 在内存存放序列化后的数据
        • 序列化存储能减少内存开销,反序列化会增大 cpu 开销
    • DISK_ONLY
  • SER
    • 序列化保存

Checkpoint

  • 将 RDD 中间结果二进制形式写入磁盘
  • 使用
    • sc.setCheckpointDir(“hdfs://hadoop102:9820/output/a”)
    • rdd.checkpoint()
    • 手动释放rddx.unpersist(true)

Question

  • RDD 持久化方式?#card
    • ((66b0ee63-1749-4898-b4d9-5ab20b6d0d2d))
    • ((66b0ee63-e35a-4fcc-8279-e5a6303b2d22))
  • memory_only 如果内存存储不了,会怎么操作?#card
    • 利用 [[LRU]] 的缓存策略把最老的分区从内存中移除
    • 下一次使用被移除的分区需要重新计算

repartition 和 coalesce 对比

Spark中repartition和coalesce 相同点 → 都是调整分区的方法

Spark中repartition和coalesce 区别 → repartition 默认有 shuffle 操作,coalesce 使用 hash paritioner 重新 shuffle 数据

什么情况使用 coalesce 调整分区 → filter 之后收缩分区

  • 为什么 → coalesce 不需要 shuffle

Pair RDD

数据分区

  • 直行聚合或分组操作时,可以给定 saprk 的分区数

Spark API

SQL

  • Hive
  • 执行流程
    • SparkSqlParser
      • AST 语法树
        • 优化理论
    • Analyzer
      • 数据源绑定 & 字段类型确定
    • Optimizer
      • RBO 优化
    • SparkPlanner
      • 执行计划 Join 选择策略

Spark Streaming

MLlib

GraphX


Spark 内存模块

运行在 JVM 体系上,内存模型基于 Java 虚拟机

堆、栈、静态代码块和全局空间

Executor

  • 申请内存包括下面两部分 M1+M2
    • OnHeap 堆内内存 spark.executor.memory
    • OffHeap 堆外内存 spark.yarn.executor.memoryOverhead
      • spark.memory.offHeap.size
  • Task Memory Manager
    • E 的线程共享 JVM 资源,没有强隔离

内存划分

  • Storage:RDD缓存、Broadcast 数据空间
  • Execution:Shuffle 过程使用的内存
    • 第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;
      • JVM
    • 第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;
      • 通过 spark.shuffle.memoryFraction 控制大小
    • 第三块是让RDD持久化时使用,默认占Executor总内存的60%。
      • persisit() 或 cache()
      • 通过 spark.storage.memoryFraction 控制大小
      • 超过限制,旧分区会被移除内存
  • Other:用户定义的数据结构、Spark 内部元数据

spark-submit

spark-submit --queue root.queue_name --executor-cores 2 --num-executors 400

num-executors:多少个 Executor 节点来执行

executor-memory

executor-cores:executor CPU 核心数

  • core 同一时间执行一个 task
  • 同一个 executor 中的 core 共享 executor-memory
    • 如果任务占用内存比较多,调小 cores 数,可以使用更多内存

driver-memory

  • collect 算子将 RDD 拉取到 Driver 处理需要避免 OOM

spark.yarn.executor.memoryOverhead executor 额外预留的内存

spark.default.parallelism

  • 每个 stage 默认 task 数量 500-1000
  • 设置该参数为num-executors * executor-cores的2~3倍较为合适

spark.dynamicAllocation.minExecutors 以及 spark.dynamicAllocation.maxExecutors

  • 运行时动态分配 core 数

((2182abe5-07fb-43d0-bfb9-0245f1507b89))

((dc10cfe7-cf86-4356-a483-32f76b84d560))


SparkSession

Spark 2.0 引入概念,为用户提供统一的切入点来使用 Spark

[[pyspark]] 例子

1
2
3
4
5
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Word Count") \
.config(conf=c) \
.getOrCreate()

Scala 例子

  • 创建
    • val sparkSession = SparkSession.builder().appName(appName).enableHiveSupport().getOrCreate()
  • sql
    • sparkSession.sql(sqlStr)
  • HiveContext
    • spark sql 支持 hive 读写
    • new HiveContext(SparkSession)
  • SparkContext
    • sparkSession.sparkContext

Broadcast

sc.Broadcast(rdd.collectAsMap())

算子函数使用外部变量,默认情况 Spark 会将该变量复制多个副本,通过网络传输到 task 中,每个 task 都有一个变量副本。

  • 如果变量本身比较大,会占用增大网络中传输的性能开销,以及在各个节点的 Executor 占用过多内存导致频繁 GC。
  • 通过 Broadcast 的变量只在 Executor 内存中保留一份。
    • Executor 会有对应的 BlockManager,BlockManager 负责管理 Executor 对应的内存和磁盘上的数据。
    • 需要使用广播变量时,先尝试从 BlockManager 获取。如果失败, BlockManager 会从 Driver 或者 其他节点的 BlockManager 拉取变量副本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

//如果是变量是 hdfs 文件,先 collect
sc.broadcast(link2NodeMap.collectAsMap())

Spark/Broadcast

sc.Broadcast(rdd.collectAsMap())

算子函数使用外部变量,默认情况 Spark 会将该变量复制多个副本,通过网络传输到 task 中,每个 task 都有一个变量副本。

  • 如果变量本身比较大,会占用增大网络中传输的性能开销,以及在各个节点的 Executor 占用过多内存导致频繁 GC。

  • 通过 Broadcast 的变量只在 Executor 内存中保留一份。

    • Executor 会有对应的 BlockManager,BlockManager 负责管理 Executor 对应的内存和磁盘上的数据。

    • 需要使用广播变量时,先尝试从 BlockManager 获取。如果失败, BlockManager 会从 Driver 或者 其他节点的 BlockManager 拉取变量副本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

//如果是变量是 hdfs 文件,先 collect
sc.broadcast(link2NodeMap.collectAsMap())