【Spark精讲】SparkSQL Join选择逻辑
SparkSQL Join选择逻辑?
先看JoinSelection的注释
??? ??? If it is an equi-join, we first look at the join hints w.r.t. the following order:
? ? ? ? ?1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
? ? ? ? ? ? have the broadcast hints, choose the smaller side (based on stats) to broadcast.
? ? ? ? ?2. sort merge hint: pick sort merge join if join keys are sortable.
? ? ? ? ?3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
? ? ? ? ? ? sides have the shuffle hash hints, choose the smaller side (based on stats) as the
? ? ? ? ? ? build side.
? ? ? ? ?4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
? ? ??
? ? ? ?If there is no hint or the hints are not applicable, we follow these rules one by one:
? ? ? ? ?1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
? ? ? ? ? ? is supported. If both sides are small, choose the smaller side (based on stats)
? ? ? ? ? ? to broadcast.
? ? ? ? ?2. Pick shuffle hash join if one side is small enough to build local hash map, and is
? ? ? ? ? ? much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
? ? ? ? ?3. Pick sort merge join if the join keys are sortable.
? ? ? ? ?4. Pick cartesian product if join type is inner like.
? ? ? ? ?5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
? ? ? ? ? ? other choice.
翻译下就是:
如果是等值join,就先看join hints,顺序如下?
- broadcast hint:join类型支持的话选择broadcast hash join,如果join的两边都有broadcast hint,选择小的(基于统计)一方去广播
- sort merge hint:如果join的key是可排序的,选择sort merge join
- shuffle hash hint:join类型支持的话选择shuffle hash join
- shuffle replicate NL hint:如果是inner like类型(inner或cross),则选择cartesian product join
如果没有hint或者hint的类型是不合适的,按如下顺序选择
- broadcast hash join:如果join的一方足够小,小到可以广播,同时join类型支持,如果两边都很小,选最小的(基于统计)
- shuffle hash join:如果join的一方足够小可以构建hash map,并且比另一端小很多,同时需要spark.sql.join.preferSortMergeJoin置为false
- sort merge join:如果join的key是可排序的
- cartesian product:如果join类型是inner like类型(inner或cross)
- broadcast nested loop join:打底策略,即便可能导致OOM但别无选择
注意:
- hash join(broadcast hash join或者shuffle hash join)只支持等值Join,不支持full outer join
- 小到可以广播指的是,小于spark.sql.autoBroadcastJoinThreshold的阈值(默认10MB)
val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
.doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been " +
"run, and file-based data source tables where the statistics are computed directly on " +
"the files of data.")
.version("1.1.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("10MB")
- shuffle hash join时要求一边比另一边小很多,小很多指的是3倍小于
/**
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
/**
* Matches a plan whose single partition should be small enough to build a hash table.
*
* Note: this assume that the number of partition is fixed, requires additional work if it's
* dynamic.
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
/**
* Returns whether plan a is much smaller (3X) than plan b.
*
* The cost to build hash map is higher than sorting, we should only build hash map on a table
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}
hash join为什么只支持等值join,同时不支持full outer join?
- 这是由于hashmap的特性决定的,随机访问效率最高O(1),为了性能是不会通过hashmap进行遍历查找的。
- 不支持full outer join 是因为小表做的是构建表,由于不是流式表,无法决定是否输出该行,完全是被动的
参考一下SparkSQL Join流程:
在Spark SQL中Join的实现都基于一个基本的流程,根据角色的不同,参与Join的两张表分别被称为"流式表"和"构建表",不同表的角色在Spark SQL中会通过一定的策略进行设定,通常来讲,系统会默认大表为流式表,将小表设定为构建表。
流式表的迭代器为StreamIterator,构建表的迭代器为BuildIterator。通过遍历StreamIterator中的每条记录,然后在BuildIterator中查找相匹配的记录,这个查找过程被称为Build过程,每次Build操作的结果为一条JoinedRow(A,B),其中A来自StreamIterator,B来自BuildIterator,这个过程为BuildRight操作,而如果B来自StreamIterator,A来自BuildIterator,则为BuildLeft操作。
对于LeftOuter、RightOuter、LeftSemi、RightSemi,他们的build类型是确定的,即LeftOuter、LeftSemi为BuildRight类型,RightOuter、RightSemi为BuildLeft类型。
cartesian join为什么会限制是inner like?
def createCartesianProduct() = {
if (joinType.isInstanceOf[InnerLike]) {
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
} else {
None
}
}
可以看下JoinType的类,继承了InnerLike的一个是inner join,一个是cross join
object JoinType {
def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
case "leftouter" | "left" => LeftOuter
case "rightouter" | "right" => RightOuter
case "leftsemi" | "semi" => LeftSemi
case "leftanti" | "anti" => LeftAnti
case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi", "semi",
"leftanti", "left_anti", "anti",
"cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
}
}
sealed abstract class JoinType {
def sql: String
}
/**
* The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
* indicating a cartesian product has been explicitly requested.
*/
sealed abstract class InnerLike extends JoinType {
def explicitCartesian: Boolean
}
case object Inner extends InnerLike {
override def explicitCartesian: Boolean = false
override def sql: String = "INNER"
}
case object Cross extends InnerLike {
override def explicitCartesian: Boolean = true
override def sql: String = "CROSS"
}
case object LeftOuter extends JoinType {
override def sql: String = "LEFT OUTER"
}
case object RightOuter extends JoinType {
override def sql: String = "RIGHT OUTER"
}
case object FullOuter extends JoinType {
override def sql: String = "FULL OUTER"
}
case object LeftSemi extends JoinType {
override def sql: String = "LEFT SEMI"
}
case object LeftAnti extends JoinType {
override def sql: String = "LEFT ANTI"
}
...
}
为什么Broadcast Nested Loop Join会OOM?
Broadcast Nested Loop Join需要广播数据集和嵌套循环,计算效率极低,对内存的需求也极大,因为不论数据集大小,都会有一个数据集被广播到所有executor上。
Cross Join优化案例
select /*+ mapjoin(b)*/
a.*, sum(b.work_date) as '工作日'
from a
cross join
work_date_dim b
on b.begin_tm >= a.任务开始时间
and b.end_tm < a.任务结束时间
group by ...
不加mapjoin的hint,执行结果就是特别慢!a表不到 10w, b表只有几千条,执行了30分钟还是不行!
加mapjoin的hint,不到 1分钟就执行完了。但是,注意b表不能太大。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!