www.g22.com - 宝马娱乐平台

搜索: 您的位置首页 > 在线留言

Spark 执行模型与性能调优文末留言免费获取《Spark:大数据集群计算的生产实践

时间:2018-09-15 22:04:23  来源:本站  作者:

  原标题:Spark 执行模型与性能调优,文末留言免费获取《Spark:大数据集群计算的生产实践》

  本文节选自《Spark:大数据集群计算的生产实践》,关注 iteblog_hadoop 公众号并在本文末评论区留言(认真写评论,增加上榜的机会)。留言点赞数排名前5名的粉丝,各免费赠送一本《Spark:大数据集群计算的生产实践》,活动截止至06月21日18:00。

  在深入探讨Spark应用的性能改善之前,有必要先了解Spark在集群上分布式执行程序的基础知识。当运行一个Spark应用时,driver进程会随着集群worker节点上的一系列executor进程一起启动。driver负责运行用户的应用程序,当有action被触发时driver负责管理所需执行的所有工作。另一方面,executor进程以任务(task)的形式执行实际的工作以及保存结果。但是,这些任务是如何分配给executor的呢?

  对于Spark应用内部触发的每个action,DAG调度器都会创建一个执行计划来完成它。执行计划就是将尽可能多的窄依赖(narrow dependency)转换(transformation)装配到各步骤(stage)中。RDD间的窄依赖是指父RDD的每一个分区最多能被一个子RDD的分区使用。当有一些宽依赖需要做shuffle操作时,stage就受限制了。当多个子RDD的分区使用同一个父RDD的分区时,RDD间就会产生宽依赖(见图3-1)。

  代码筛选出偶数,然后乘以3并通过collect操作将结果返回。由于它们输入分区的数据没有分发到多个输出分区,因而都是窄转换,所以它们都将在同一个stage中执行。

  另外,下面的代码对文件里的单词进行计数,筛选出现过10次的单词,然后对这些单词中的每个字符出现的次数进行计数。最后,通过collect action操作触发job的执行。这些转换中有两个是stage边界(它们有宽依赖)。代码中的两个reduceByKey转换是生成3个stage的原因:

  在定义3个stage之后,调度器将启动一个task,计算出最终RDD对应的各个分区。因此,事实上stage是一组任务,对数据的不同子集执行相同转换(transformation)。任务调度器将基于可用资源及数据局部性把这些任务分配给executor。

  例如,如果需要转换的分区已经在某个特定节点的内存里,则任务的执行将被发送到该节点。

  根据上一节的描述,可以推断出RDD分区方式能极大地影响执行计划的创建方式,因此也会间接影响性能。现在我们看看分区是如何影响Spark应用性能的。

  分区(partition)其实就是RDD中的数据被切分后形成的片段。当DAG调度器将job转换为stage时,每个分区将被处理成一个task,每个task需要一个CPU核来执行。这意味着Spark应用的并行度取决于RDD的分区数。因此,不难理解对Spark应用性能进行调优时,RDD的分区数可能是需要考虑的最重要的事情。

  RDD的分区数与其创建方式高度相关。从文件创建的RDD都有默认的分区数。例如,如果文件存储在HDFS上,分区数将等于文件块数目(一个文件块对应一个分区)。这意味着可以通过在HDFS上写文件时的块大小,或者通过配置InputFormat创建的分片(split)的多少,来控制分区数。

  你也可以通过并行化集合来创建RDD。在本例中,默认的分区数是由spark.default.parallelism属性决定的。这个默认值由集群管理器决定:对于运行在local模式的Spark 1.5.2来说,其值为CPU核的数目;对于细粒度模式的Mesos来说,其值为8;在其他情况下,分区数取2与所有executor上的CPU核总数的最大值。

  但是,你可以控制这些默认值。对这两种创建RDD的方式,可以通过一个用户输入参数来控制分区的数量:

  最常见的创建RDD的方式,是对已有的RDD进行一些转换操作(transformation)。通常,一个RDD的分区数与它所依赖的RDD的分区数相同。然而,有一些转换,例如“union”(合并)就不受此规则的制约,因为它创建的RDD其分区数等于父RDD所有分区数的总和。

  我们来看另一种会引起数据shuffle的转换操作。这类转换都是宽依赖,即计算RDD的一个分区需要处理父RDD的多个分区的数据。这种情况下,如果不特别指定,默认分区数将是所依赖的RDD的最大分区数。且看Pair RDD上的一个groupByKey转换的示例:

  要想写一个高效的Spark应用程序,就必须设置最优的分区数。假设job生成的任务数少于可用的CPU数。这种情况下,可能面临两个性能问题:第一,不能充分发挥整体计算能力;第二,如果分区数量少,单个分区内的数据量将会比分区数量更多时大很多。对于更大的数据集,执行任务时还会有内存压力和垃圾回收的压力,导致运算速度减慢。

  同样,如果一个分区内的数据太大无法加载到内存,数据将不得不溢写到磁盘以避免出现out-of-memory异常。但是溢写到磁盘需要排序及磁盘I/O等操作,会带来巨大的开销。

  为充分利用集群的计算能力,分区数至少应当等于集群分配给应用程序的CPU数,但是分区过大的问题依然没有得到解决。如果你的数据集非常大,而集群又相当小,那么你的分区还是会过大。这种情形下,RDD的分区数必须远高于可用的CPU核数。

  另一方面,你还得考虑周全以防落入另一种极端:分区数过多。分区数过多将会生成许多需要发送到worker节点执行的小任务,这将增加任务调度的开销。不过,启动任务所带来的性能损失比数据溢写到硬盘的损失要小。如果有许多任务几乎瞬时完成,或者这些任务根本没有执行任何读/写操作,这就表明你的并行度太高了。

  对于RDD来说,很难计算出一个最佳的分区数,因为这很大程度上取决于数据集的大小、分区器(paritioner)本身,以及每个任务可用的总内存。为估算出一个比较精确的分区数,你需要了解你的数据及其分布情况。不过,建议把每个RDD的分区数量设置为CPU数的2到4倍。

  我们讨论了如何控制RDD的分区数量,但是数据在这些分区上是怎样分布的呢?为了让分区中的数据分散到集群上,Spark使用分区器(paritioner),目前有两种内置的分区器:HashParitioner和RangePartitioner。

  选择分区器的默认方式是,对这两个参数中的一个进行设置来决定使用何种分区器:

  HashPartitioner基于键(key)的哈希码(hash code)把值分布到各个分区上。通过计算键的哈希码与分区数的模,得到分区索引值,在计算中也需要考虑到哈希码的正负情况。

  RangePartitioner根据范围对可排序项进行分区。对RDD内容进行取样能决定大致的范围区间。最终的分区数可能小于配置的数。

  不过,也不是非得使用这些分区器,你可以自己开发一个。如果你对使用场景的领域知识非常了解,会非常有帮助。假设你有一个Pair RDD,其中键是文件系统上文件的路径。如果使用HashPartitioner,foler1/firstFileName.txt与folder1/secondFileName.png 可能结束于不同节点的不同分区。如果你想把同一文件夹的所有文件放在同一分区内,可以编写自己的分区器,基于父文件夹来分发文件。

  编写自定义分区器其实非常简单,只需要对分区器的org.apache.spark. Partitioner进行扩展,并实现下面的方法即可。

  equals and hashcode——用来将你的分区器同其他分区器进行比较的方法。

  一旦实现了自定义分区器,使用起来会非常简单。既可以把它传递给partitionBy函数,也可以传递给基于shuffle的函数。

  有一些操作,譬如map函数,也会影响分区。在map操作中,可以改变一个Pair RDD中的键,这样分区就会发生变化。这种情形下,生成的RDD将没有分区器集。不过,你可以从两种方式中(mapValues及flatMapValues)选择一种,对Pair RDD中的值进行map处理以保留分区器。

  【1】关注 iteblog_hadoop 公众号,并在评论区留言获点赞数最高前5名将赠送;《Spark:大数据集群计算的生产实践》1本,共送出5本;

  【3】活动结束后,收到中奖通知的用户请在公众号回复:微信号 + 姓名 + 地址+ 电话 + 邮编;

相关文章列表
    无相关信息
推荐资讯
栏目更新
热点排行