Spark 3.0如何提高SQL工作负载的性能

发布日期:2021-01-05 11:22
574e9258d109b3de78706d36e010b687820a4cd3
在几乎所有处理复杂数据的部门中,Spark很快已成为跨数据和分析生命周期的团队的事实上的分布式计算框架。 新的Adaptive Query Execution框架(AQE)是Spark 3.0最令人期待的功能之一,它可以解决困扰许多Spark SQL工作负载的问题。英特尔和百度混合团队在2018年初的博客中记录了这些内容。要更深入地了解框架,请学习我们更新的Apache Spark Performance Tuning课程。
我们在Workload XM方面的经验无疑证实了这些问题的现实性和严重性。
AQE最初是在Spark 2.4中引入的,但随着Spark 3.0的发展,它变得更加强大。尽管Cloudera建议在我们交付Spark 3.1之前等待在生产中使用它,但您现在可以使用AQE开始在Spark 3.0中进行评估。 
首先,让我们看一下AQE解决的问题类型。

初始催化剂设计中的缺陷
下图表示使用DataFrames执行简单的按组分组查询时发生的分布式处理的类型。

Spark为第一阶段确定适当的分区数量,但对于第二阶段,使用默认的幻数200。
不好的原因有三个:
  1. 200不可能是理想的分区数,而分区数是影响性能的关键因素之一;
  2. 如果将第二阶段的输出写入磁盘,则可能会得到200个小文件。
  3. 优化及其缺失会产生连锁反应:如果在第二阶段之后继续进行处理,您可能会错过进行更多优化的潜在机会。
您可以做的是在执行类似于以下语句的查询之前,手动为此shuffle设置此属性的值:

spark.conf.set(“ spark.sql.shuffle.partitions”,“ 2”)

这也带来了一些挑战:
  • 在每次查询之前都要设置此属性
  • 这些值将随着数据的发展而过时
  • 此设置将应用于查询中的所有Shuffle操作
在上一个示例的第一阶段之前,数据的分布和数量是已知的,Spark可以得出合理的分区数量值。但是,对于第二阶段,此信息尚不知道要获得执行第一阶段的实际处理所要付出的代价:因此,求助于幻数。 

自适应查询执行设计原理
AQE的主要思想是使执行计划不是最终的,并允许在每个阶段的边界进行审核。因此,执行计划被分解为由阶段界定的新的“查询阶段”抽象。
催化剂现在停在每个阶段的边界,以根据中间数据上可用的信息尝试并应用其他优化。
因此,可以将AQE定义为Spark Catalyst之上的一层,它将动态修改Spark计划。
有什么缺点吗?有一些,但它们很小:
  • 执行在Spark的每个阶段边界处停止,以查看其计划,但这被性能提升所抵消。
  • Spark UI更加难以阅读,因为Spark为给定的应用程序创建了更多的作业,而这些作业不会占用您设置的Job组和描述。


Shuffle分区的自适应数目
自Spark 2.4起,AQE的此功能已可用。
要启用它,您需要将spark.sql.adaptive.enabled设置为true ,该参数默认值为false 。启用AQE后,随机调整分区的数量将自动调整,不再是默认的200或手动设置的值。
这是启用AQE之前和之后第一个TPC-DS查询的执行结果:


动态将排序合并联接转换为广播联接
当任何联接端的运行时统计信息小于广播哈希联接阈值时,AQE会将排序合并联接转换为广播哈希联接。
这是启用AQE之前和之后第二个TPC-DS查询执行的最后阶段:


动态合并shuffle分区

动态合并shuffle分区

如果随机播放分区的数量大于按键分组的数量,则由于键的不平衡分配,会浪费很多CPU周期
当两个 

spark.sql.adaptive.enabled
spark.sql.adaptive.coalescePartitions.enabled

设置为true ,Spark将根据以下内容合并连续的shuffle分区
设置为spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小,以避免执行过多的小任务。


动态优化倾斜的连接
倾斜是分布式处理的绊脚石。它实际上可能会使您的处理暂停数小时:

如果不进行优化,则执行连接所需的时间将由最大的分区来定义。

因此,倾斜联接优化将使用spark.sql.adaptive.advisoryPartitionSizeInBytes指定的值将分区A0划分为子分区,并将它们中的每一个联接到表B的对应分区B0。

因此,您需要向AQE提供您的倾斜定义。
这涉及两个属性:
  1. spark.sql.adaptive.skewJoin.skewedPartitionFactor是相对的:如果分区的大小大于此因子乘以中位数分区大小且也大于,则认为该分区是倾斜的
  2. spark.sql.adaptive.skewedPartitionThresholdInBytes ,这是绝对的:这是阈值,低于该阈值将被忽略。
动态分区修剪
动态分区修剪(DPP)的想法是最有效的优化技术之一:仅读取所需的数据。DPP不是AQE的一部分,实际上,必须禁用AQE才能进行DPP。从好的方面来说,这允许将DPP反向移植到Spark 2.4 for CDP。
该优化在逻辑计划和物理计划上均实现。
  1. 在逻辑级别上,识别维度过滤器,并通过连接传播到扫描的另一侧。
  2. 然后,在物理级别上,过滤器在维度侧执行一次,结果被广播到主表,在该表中也应用了过滤器。


如果禁用spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly,则DPP实际上可以与其他类型的联接一起使用(例如,SortMergeJoin)。 
在那种情况下,Spark会估计DPP过滤器是否真正提高了查询性能。
DPP可以极大地提高高度选择性查询的性能,例如,如果您的查询从5年的数据中的一个月中筛选出来。

并非所有查询的性能都有如此显着的提高,但是在99个TPC-DS查询中,有72个受到DPP的积极影响。

结论
Spark距其最初的核心范例还有很长的路要走:在静态数据集上懒惰地执行优化的静态计划。
静态数据集部分受到流技术的挑战:Spark团队首先创建了一个基于RDD的笨拙设计,然后提出了一个涉及DataFrames的更好的解决方案。
静态计划部分受到SQL和Adaptive Query Execution框架的挑战,从某种意义上说,结构化流对于初始流库是什么:它应该一直是一个优雅的解决方案。
分享到:
推荐精彩博文