Architecture
把输入的SQL,parse成unresolved logical plan,这一步参考SqlParser的实现
把unresolved logical plan转化成resolved logical plan,这一步参考analysis的实现
把resolved logical plan转化成optimized logical plan,这一步参考optimize的实现
把optimized logical plan转化成physical plan,这一步参考QueryPlanner Strategy的实现
Source Code Module
Rule
Rule是一个抽象类,拥有一个名字,默认为类名。Rule的实现有很多,渗透在不同的处理过程(analyze, optimize)里。RuleExecutor是规则执行类,下面两个实现会具体讲:
Analyzer
RuleExecutor 支持的策略:一次或多次。用来控制converge结束的条件。这里的Strategy名字感觉有点误导人。
/**
* An execution strategy for rules that indicates the maximum number of executions. If the
* execution reaches fix point (i.e. converge) before maxIterations, it will stop.
*/
abstract class Strategy { def maxIterations: Int }
/** A strategy that only runs once. */
case object Once extends Strategy { val maxIterations = 1 }
/** A strategy that runs until fix point or maxIterations times, whichever comes first. */
case class FixedPoint(maxIterations: Int) extends Strategy
RuleExecutor的Batch类和batches变量:
/** A batch of rules. */ protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected val batches: Seq[Batch]一个batch有多个Rule
batches在apply()里执行的时候,把一个plan丢进来后,用左折叠处理每个batch,最后吐出来一个plan。
converge的条件是达到最大策略次数或者两个TreeNode相等。apply()处理过程如下:
/**
* Executes the batches of rules defined by the subclass. The batches are executed serially
* using the defined execution strategy. Within each batch, rules are also executed serially.
*/
def apply(plan: TreeType): TreeType = {
var curPlan = plan
batches.foreach { batch =>
var iteration = 1
var lastPlan = curPlan
curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => rule(plan) }
// Run until fix point (or the max number of iterations as specified in the strategy.
while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) {
lastPlan = curPlan
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val result = rule(plan)
if (!result.fastEquals(plan)) {
logger.debug(...)
}
result
}
iteration += 1
}
}
curPlan
}
下面具体介绍RuleExecutor的实现。
Analyzer
Analyzer使用于对最初的unresolved logical plan转化成为logical plan。这部分的分析会涵盖整个analysis package。
作用是把未确定的属性和关系,通过Schema信息(来自于Catalog类)和方法注册类来确定下来,这个过程中有三步,第三步会包含许多次的迭代。
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
* a [[FunctionRegistry]].
*/
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {
首先,Catalog类是一个记录表信息的类,专门提供给Analyzer用。
trait Catalog {
def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
}
看一个SimpleCatalog的实现,该类在SQLContext里使用,把表名和LogicalPlan存在HashMap里维护起来,生命周期随上下文。提供注册表、删除表、查找表的功能。
class SimpleCatalog extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
tables += ((tableName, plan))
}
def dropTable(tableName: String) = tables -= tableName
def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {