数据分区
- 直行聚合或分组操作时,可以给定 saprk 的分区数
数据分区
持久化指将数据进行保存,避免数据丢失。
RDD 持久化并非将数据落盘,而是缓存数据,供后续计算使用。
Cache()
persist()
使用指定的方式进行持久化
StorageLevel.
MEMORY_ONLY
内存优先
RDD 分区空间不够,旧的分区会直接删除
MEMORY_AND_DISK_SER
优先内存,内存不足到磁盘。
节省重新计算的开销
MEMORY_ONLY_SER
在内存存放序列化后的数据
DISK_ONLY
SER
Checkpoint
将 RDD 中间结果二进制形式写入磁盘
使用
sc.setCheckpointDir(“hdfs://hadoop102:9820/output/a”)
rdd.checkpoint()
手动释放rddx.unpersist(true)
RDD 持久化方式? #card
Cache()
persist()
memory_only 如果内存存储不了,会怎么操作? #card
利用 [[LRU]] 的缓存策略把最老的分区从内存中移除
下一次使用被移除的分区需要重新计算
共享内存模型
依赖关系
窄依赖 narrow dependency
OneToOneDependency
父 RDD 每个分区只被子 RDD 的一个分区所使用
不需要 shuffle
map,union
宽依赖 wide dependency/shuffle
父 RDD 的每个分区可能被多个子 RDD分区所使用,会有 shuffle 产生
groupByKey
Partitioner 分区器
定义如何分布数据
可使用分区器
[[HashPartitioner]] 给定的 key,计算 hashCode,对分区个数取余
[[RangePartitioner]] 尽量保证每个分区中的数据量均匀,且分区与分区之间是有序的。
自定义分区器
RDD是弹性数据集,“弹性”体现在哪里呢?
存储弹性
容错弹性
计算过程中如果出错会自动重试
task 失败会重试
stage 失败会重试失败的分片
计算弹性
分区弹性
你觉得RDD有哪些缺陷?
RDD分区和数据块有啥联系?
SQL
Hive
执行流程
+ SparkSqlParser
+ AST 语法树
+ 优化理论
+ Analyzer
+ 数据源绑定 & 字段类型确定
+ Optimizer
+ RBO 优化
+ SparkPlanner
+ 执行计划 Join 选择策略
Spark Streaming
MLlib
GraphX
运行在 JVM 体系上,内存模型基于 Java 虚拟机
堆、栈、静态代码块和全局空间
Executor
M1+M2
+ OnHeap 堆内内存 spark.executor.memory
+ OffHeap 堆外内存 spark.yarn.executor.memoryOverhead
+ spark.memory.offHeap.size
Task Memory Manager
内存划分
Storage:RDD缓存、Broadcast 数据空间
Execution:Shuffle 过程使用的内存
第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;
第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;
第三块是让RDD持久化时使用,默认占Executor总内存的60%。
persisit() 或 cache()
通过 spark.storage.memoryFraction 控制大小
超过限制,旧分区会被移除内存
Other:用户定义的数据结构、Spark 内部元数据
Spark 2.0 引入概念,为用户提供统一的切入点来使用 Spark
[[pyspark]] 例子
1 | from pyspark.sql import SparkSession |
Scala 例子
创建
val sparkSession = SparkSession.builder().appName(appName).enableHiveSupport().getOrCreate()
sql
sparkSession.sql(sqlStr)
HiveContext
spark sql 支持 hive 读写
new HiveContext(SparkSession)
SparkContext
sparkSession.sparkContext
sc.Broadcast(rdd.collectAsMap())
算子函数使用外部变量,默认情况 Spark 会将该变量复制多个副本,通过网络传输到 task 中,每个 task 都有一个变量副本。
如果变量本身比较大,会占用增大网络中传输的性能开销,以及在各个节点的 Executor 占用过多内存导致频繁 GC。
通过 Broadcast 的变量只在 Executor 内存中保留一份。
Executor 会有对应的 BlockManager,BlockManager 负责管理 Executor 对应的内存和磁盘上的数据。
需要使用广播变量时,先尝试从 BlockManager 获取。如果失败, BlockManager 会从 Driver 或者 其他节点的 BlockManager 拉取变量副本。
1 | // 以下代码在算子函数中,使用了外部的变量。 |
coalesce
repartition
Spark中repartition和coalesce 相同点 :-> 都是调整分区的方法
Spark中repartition和coalesce 区别 :-> repartition 默认有 shuffle 操作,coalesce 使用 hash paritioner 重新 shuffle 数据
什么情况使用 coalesce 调整分区 :-> filter 之后收缩分区
card-last-score:: 5
card-repeats:: 1
card-next-schedule:: 2022-10-22T02:05:11.083Z
card-last-interval:: 4
spark-submit --queue root.queue_name --executor-cores 2 --num-executors 400
num-executors:多少个 Executor 节点来执行
executor-memory
executor-cores:executor CPU 核心数
core 同一时间执行一个 task
同一个 executor 中的 core 共享 executor-memory
driver-memory
spark.yarn.executor.memoryOverhead executor 额外预留的内存
spark.default.parallelism
每个 stage 默认 task 数量 500-1000
设置该参数为num-executors * executor-cores
的2~3倍较为合适
spark.dynamicAllocation.minExecutors
以及 spark.dynamicAllocation.maxExecutors
spark.driver.maxResultSize
设置 Executor 端发回数据量
降低 spark.memory.fraction