【大数据】SparkSql连接查询中的谓词下推处理(一)
转自:vivo互联网技术
作者:李勇
1.SparkSql
SparkSql是架构在Spark计算框架之上的分布式Sql引擎,使用DataFrame和DataSet承载结构化和半结构化数据来实现数据复杂查询处理,提供的DSL 可以直 接使用scala语言完成Sql查询,同时也使用thriftserver提供服务化的Sql查询功能。SparkSql提供了DataSource API,用户通过这套API可以自己开发一套Connector,直接查询各类数据源,数据源包括NoSql、RDBMS、搜索引擎以及HDFS等分布式文件系统上的文件等。
2.连接查询和连接条件
Sql中的连接查询(join),主要分为内连接查询 (inner join)、外连接查询 (outter join)和半连接查询 (semi join),具体的区别可以参考wiki的解释。
连接条件(join condition),则是指当这个条件满足时两表的两行数据才能"join"在一起被返回,例如有如下查询:
其中的"LT.id=RT.idAND LT.id>1"这部分条件被称为"join中条件",直接用来判断被join的两表的两行记录能否被join在一起,如果不满足这个条件,两表的这两行记录并非全部被踢出局,而是根据连接查询类型的不同有不同的处理,所以这并非一个单表的过滤过程或者两个表的的“联合过滤”过程;而where后的"RT.id>2"这部分被称为"join后条件",这里虽然成为"join后条件",但是并非一定要在join后才能去过滤数据,只是说明如果在join后进行过滤,肯定可以得到一个正确的结果,这也是我们后边分析问题时得到正确结果的基准方法。
3.谓词下推
所谓谓 词(pred icate),英文定义是这样的:A predicate is a function that returns bool (or something that can be implicitly converted to bool),也就是返回值是true或者false的函数,使用过scala或者spark的同学都知道有个filter方法,这个高阶函数传入的参数就是一个返回true或者false的函数。
但是如果是在sql语言中,没有方法,只有表达式。where后边的表达式起的作用正是过滤的作用,而这部分语句被sql 层解 析处理后,在数据库内部正是以谓词的形式呈现的。
那么问题来了,谓词为什么要下 推呢? SparkSql中的谓 词下 推有两层含义,第一层含义是指由谁来完成数据过滤,第二层含义是指何时完成数据过滤。要解答这两个问题我们需要了解Spark Sql的Sql语句处理逻辑,大致可以把Spark Sql中的查询处理流程做如下的划分:
如上图,Spark Sql会先对输入的Sql语句进行一系列的分析(Analyse),包括词法解析、语法分析以及语义分析;然后是执行计划的生成,包括逻辑计划和物理计划。其中在逻辑计划阶段会有很多的优化,对谓词的处理就在这个阶段完成;而物理计划则是Spark core 的RDD DAG图的生成过程;这两步完成之后则是具体的执行了(也就是各种重量级的计算逻辑,例如join、groupby、filter以及distinct等)。能够完成数据过滤的主体有两个,第一是分布式 Sql层 (在execute 阶 段),第二个是 数据源。那么谓词下推的第一层含义就是指由 Sql层的 Filter操作符来完成过滤,还是由Scan 操作在扫描阶段完成过滤。上边提到,我们可以通过封装SparkSql的Data Source API完成各类数据源的查询,那么如果底层 数据源无法高效完成数据的过滤,就会执行全扫描,把每条相关的数据都交给SparkSql的Filter操作符完成过滤,虽然SparkSql使用的Code Generation技术极大的提高了数据过滤的效率,但是这个过程无法避免磁 盘读 取大 量数 据,甚至在某些情况下会涉及网络IO(例如数据非本地化存储时);如果底层数据源在进行扫描时能非常快速的完成数据的过滤,那么就会把过滤交给底层数据源来完成,至于哪些数据源能高效完成数据的过滤以及SparkSql又是如何完成高效数据过滤的则不是本文讨论的重点,会在其他系列的文章中讲解。
那么谓 词 下 推第二层含义,即何时完 成数 据过滤则一般是在指连接查询中,是先对单表 数 据进行过 滤再和其他表连 接还是在先把多表进行连接再对连 接后的临 时表进 行过滤
4.内连接查询中的谓词下推规则
假设我们有两张表,表结构很简单,数据也都只有两条,但是足以讲清楚我们的下推规则,两表如下,一个lefttable,一个righttable:
4.1.Join后条件通过 AND 连接
先来看一条查询语句:
这个查询是一个内连接查询,join后条件是用and连接的两个表的过滤条件,假设我们不下推,而是先做内连接判断,这时是可以得到正确结果的,步骤如下:
1) 左表id为1的行在右表中可以找到,即这两行数据可以"join"在一起
2) 左表id为2的行在 右表中可以找到,这两行也可以"join"在一起
至此,join的临时结 果表(之所以是临时表,因为还没有进行过滤)如下:
然后使用where条件 进行过滤 ,显然临时表中的第一行不满足 条件,被过滤掉,最后结果如下:
来看看先进行谓词 下推的情况。先对两表进行 过滤,过滤的结果分别如下:
然后再对这两个过滤后的表进行内连接处理,结果如下:
可见,这和先进行 join 再过滤得到的结果一致。
4.2.Join后条件 通过 OR 连接
再来看一条查询语句:
我们先进 行join处 理,临时 表的结果如下:
然后使用where条件进行过滤,最终查询结果如下:
如果我们先使用where条件后每个表各自的过滤条件进行过滤,那么两表的过滤结果如下:
然后对这两个临时表进行内连接处理,结果如下:
表格有问题吧,只有字段名,没有字段值,怎么回事?是的,你没看错,确实没有值,因为左表过滤结果只有id为1的行,右表过滤结果只有id为2的行,这两行是不能内连接上的,所以没有结果。
那么为什么where条 件中两表的条件被or连 接就会出现错误的查询结果呢?分析原因主要是因为,对于or两侧的过滤条件,任何一个满足条件即可以返 回TRUE,那么对于"LT.value = 'two' OR RT.value = 'two' "这个查询条件,如果使用LT.value='two'把只有LT.value为'two'的左表记录过滤出来,那么对于左表中LT.value不为two的行,他们可能在跟右表使用id字段连接上之后,右表的RT.value恰好为two,也满足"LT.value = 'two' OR RT.value = 'two' ",但是可惜呀可惜,这行记录因为之前的粗暴处理,已经被过滤掉,结果就是得到了错误的查询结果。所以这种情况下谓词是不能下推的。
但是OR连接两 表join后条件也有两个例外,这里顺便分析第一个例外。第一个例外是过滤条件字段恰好为Join字段,比如如下的查询:
在这个查询中,join后条件依然是使用OR连接两表的过滤条件,不同的是,join中条件不再是id相等,而是value字段相等,也就是说过滤条件字段恰好就是join条件字段。大家可以自行采用上边的分步法分析谓词下推和不下推时的查询结果,得到的结果是相同的。我们来看看上边不能下推时出现的情况在这种查询里会不会出现。对于左表,如果使用LT.value='two'过滤掉不符合条件的其他行,那么因为join条件字段也是value字段,说明在左表中LT.value不等于two的行,在右表中也不能等于two,否则就不满足"LT.value=RT.value"了。这里其实有一个条件传递的过程,通过join中条件,已经在逻辑上提前把两表整合成了一张表。
至于第二个例外,则涉及了SparkSql中的一个优化,所以需要单独介绍。
4.3.分区表使 用OR连 接过滤条件
如果两个表都是分区表,会出现什么情况呢?我们先来看如下的查询:
此时左 表和右 表都不再是普通的表,而是分区表,分区字段是pt,按照日期进行数据分区。同时两表查询条件依然使用OR进行连接。试想,如果不能提前对两表 进行过滤,那么会有非常巨量的数据要首先进 行连 接处理,这个代价是非 常大的。但是如果按照我们在2中的分析,使用OR连 接两 表的过滤条件,又不能随意的进行谓词下推,那要如何处理呢?SparkSql在这里 使用了一种叫做“分区裁剪”的优化手段,即把分区并不看做普通的过滤条件,而是使用了“一刀切”的方法,把不符合查询分区条件的目录直接排除在待扫描的目录之外。我们知道分区表在HDFS上是按照目录来存储一个分区的数据的,那么在进行分区裁剪时,直接把要扫描的HDFS目录通知Spark的Scan操作符,这样,Spark在进行扫描时,就可以直接咔嚓掉其他的分区数据了。但是,要完成这种优化,需要SparkSql的语义分析逻辑能够正确的分析出Sql语句所要表达的精确目的,所以分区字段在SparkSql的元数据中也是独立于其他普通字段,进行了单独的标示,就是为了方便语义分析逻辑能区别处理Sql语句中where条件里的这种特殊情况。