从开发层面上优化Spark作业

概述

        当开发人员通过Spark官方文档上的API来编写Spark作业时,将会遇到transformation, action和RDD这样的单词。对于开发人员,理解这种级别的Spark概念是至关重要的。比方说,当Spark作业失败,或者通过Spark UI界面来理解为什么作业运行需要那么长时间,将会遇到一些新的Spark概念,job, stage和task。了解这个级别的Spark对于编写好的Spark程序来说是至关重要的,当然,好的,我的意思是很快。 要编写一个有效执行的Spark程序,理解Spark的底层执行模型是非常有用的。

Spark作业执行流程

        Spark应用程序由一个Driver进程和一组分布在集群节点上的Executor进程组成。
        Spark Driver程序负责向Executor调度任务,Executor负责具体Task的执行,存储用户选择缓存的那些数据。逻辑图如图所示

        在Spark应用程序中调用一个action会触发Spark作业的启动。 为了规定出Spark作业任务执行流程,Spark检查该行为所依赖的RDD并且会通过DAG来描述Spark作业的样子。这个工作是从最后面的RDD开始的,也就是那些不依赖于其他RDD或引用已经被缓存的数据的RDD。
        DAG由一系列Stages组成,每个Stage会细分为task,task的个数由stage的分区数来决定,同一个Stage中的每个task的执行逻辑是相同的。
        什么决定了数据是否需要shuffle? 回想一下,RDD包含固定数量的分区,每个分区包含多个记录。 对于像map和filter这样所谓的窄变换所返回的RDD,计算单个分区中的记录所需的记录驻留在父RDD中的单个分区中。 每个对象只依赖于父对象中的一个对象。 像coalesce这样的操作可能会导致一个任务处理多个输入分区,但是转换仍然被认为是狭窄的,因为用于计算任何单个输出记录的输入记录仍然只能位于分区的有限子集中。
        但是,Spark也支持具有宽依赖的转换,如groupByKey和reduceByKey。 在这些依赖关系中,计算单个分区中的记录所需的数据可能驻留在父RDD的多个分区中。 具有相同key的所有元组都必须在相同的分区中,由同一个任务处理。 为了满足这些操作,Spark必须执行一个shuffle,这个shuffle将在集群周围传输数据,并产生一个新的分区集。
        在Stage的边界上,子Stage中的task将会通过网络来获取父Stage中task执行之后的数据。由于这将会带来严重的disk I/O和网络带宽,所以Stage边界代价是极高的,应该尽量避免。子Stage中的分区数可能与父stage中的分区数不相同。触发Stage边界的Transformations操作可以通过numPartitions参数来指定子Stage中的分区数。
        就像我们在调优MR作业时通过改变reducer的个数一样,Spark中调整Stage边界分区数也能很大的提高应用程序性能。

1 避免使用shuffle类算子

        如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

        shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

        因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Broadcast与map进行join代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

如下面代码

1
2
3
rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)

        对于reduceByKey这样的算子当没有指定分区器的时候就会采用默认的hash分区器,上述代码段会发生两次shuffle过程,对于join算子,当两个RDD为相同的分区器和分区数时,将不会发生额外的shuffle。因为这里的 RDD 使用相同的 hash 方式进行 partition,所以全部 RDD 中同一个 partition 中的 key的集合都是相同的。因此,rdd3中一个 partiton 的输出只依赖rdd2和rdd1的同一个对应的 partition,所以第三次shuffle 是不必要的。
        如果 rdd1 和 rdd2 在 reduceByKey 时使用不同的 partitioner 或者使用相同的 partitioner 但是 partition 的个数不同的情况,那么只有一个 RDD (partiton 数更少的那个)需要重新 shuffle。

什么时候shuffle多性能更优

        尽可能减少 shuffle 的准则也有例外的场合。如果额外的 shuffle 能够增加并发那么这也能够提高性能。比如当你的数据保存在几个没有切分过的大文件中时,那么使用 InputFormat 产生分 partition 可能会导致每个 partiton 中聚集了大量的record,如果 partition 不够,导致没有启动足够的并发。在这种情况下,我们需要在数据载入之后使用 repartiton (会导致shuffle)提高 partiton 的个数,这样能够充分使用集群的CPU。
另外一种例外情况是在使用 recude 或者 aggregate action 聚集数据到 driver 时,如果数据把很多 partititon 个数的数据,单进程执行的 driver merge 所有 partition 的输出时很容易成为计算的瓶颈。为了缓解 driver 的计算压力,可以使用reduceByKey 或者 aggregateByKey 执行分布式的 aggregate 操作把数据分布到更少的 partition 上。每个 partition中的数据并行的进行 merge,再把 merge 的结果发个 driver 以进行最后一轮 aggregation。查看 treeReduce 和treeAggregate 查看如何这么使用的例子。
这个技巧在已经按照 Key 聚集的数据集上格外有效,比如当一个应用是需要统计一个语料库中每个单词出现的次数,并且把结果输出到一个map中。一个实现的方式是使用 aggregation,在每个 partition 中本地计算一个 map,然后在 driver中把各个 partition 中计算的 map merge 起来。另一种方式是通过 aggregateByKey 把 merge 的操作分布到各个partiton 中计算,然后在简单地通过 collectAsMap 把结果输出到 driver 中。

2 数据序列化

        序列化在任何一种分布式应用性能优化时都扮演几位重要的角色。如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。 通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库:

  1. Java serializationSpark默认的序列化机制,只有实现了java.io.Serializable接口的对象,都能被序列化。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。
  2. Kryo serialization Spark还可以使用Kryo库(版本2)提供更高效的序列化格式。Kryo的序列化速度和字节占用都比Java序列化好很多(通常是10倍左右),但Kryo不支持所有实现了Serializable 接口的类型,它需要你在程序中register 需要序列化的类型,以得到最佳性能。

对于Spark作业,会有下列三个地方涉及序列化:

  1. 对于Spark的算子闭包引用外面的变量时;
  2. 自定义的类型作为RDD的泛型类型时;
  3. 使用序列化的持久策略;

        对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

1
2
3
4
5
6
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

3 数据本地化

                数据本地性对Spark作业往往会有较大的影响。如果代码和其所操作的数据在同一节点上,那么计算速度肯定会更快一些。但如果二者不在一起,那必然需要移动其中之一。一般来说,移动序列化好的代码肯定比挪动一大堆数据要快。Spark就是基于这个一般性原则来构建数据本地性的调度。
数据本地性是指代码和其所处理的数据的距离。基于数据当前的位置,数据本地性可以划分成以下几个层次(按从近到远排序)

  • PROCESS_LOCAL 数据和运行的代码处于同一个JVM进程内;
  • NODE_LOCAL 数据和代码处于同一节点。例如,数据处于HDFS上某个节点,而对应的执行器(executor)也在同一个机器节点上。这会比PROCESS_LOCAL稍微慢一些,因为数据需要跨进程传递;
  • NO_PREF 数据在任何地方处理都一样,没有本地性偏好;
  • RACK_LOCAL 数据和代码处于同一个机架上的不同机器。这时,数据和代码处于不同机器上,需要通过网络传递,但还是在同一个机架上,一般也就通过一个交换机传输即可;
  • ANY 数据在网络中未知,即数据和代码不在同一个机架上。

        Spark倾向于让所有任务都具有最佳的数据本地性,但这并非总是可行的。某些情况下,可能会出现一些空闲的执行器(executor)没有待处理的数据,那么Spark可能就会牺牲一些数据本地性。有两种可能的选项:a)等待已经有任务的CPU,待其释放后立即在同一台机器上启动一个任务;b)立即在其他节点上启动新任务,并把所需要的数据复制过去。
  通常,Spark会等待一会,看看是否有CPU会被释放出来。一旦等待超时,则立即在其他节点上启动并将所需的数据复制过去。数据本地性各个级别之间的回落超时可以单独配置,也可以在统一参数内一起设定;详细请参考 configuration page 中的 spark.locality 相关参数。如果你的任务执行时间比较长并且数据本地性很差,你就应该试试调大这几个参数,不过默认值一般都能适用于大多数场景了。

4 复用RDD

        通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。
        首先要避免创建重复RDD。否则导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。
        之后在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。
        最后我们还应该对多次使用的RDD进行缓存持久化。Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。
        因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作

Spark的持久化级别

持久化级别 含义解释
MEMORY_ONLY 默认的缓存策略,仅仅会缓存到内存,内存不够就不缓存
MEMORY_AND_DISK 先缓存到内存,内存不够之后缓存到内存
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化
MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. _2代表副本

持久化策略的选择

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。
  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
  • 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

5 广播大变量

        有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。
        在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。
        因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
代码如下

1
2
3
4
5
6
7
8
9
10
11
12
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)