Spark Catalyst 源码分析(一)

2014-11-24 00:14:04 · 作者: · 浏览: 8

Architecture

\

把输入的SQL,parse成unresolved logical plan,这一步参考SqlParser的实现


把unresolved logical plan转化成resolved logical plan,这一步参考analysis的实现


把resolved logical plan转化成optimized logical plan,这一步参考optimize的实现


把optimized logical plan转化成physical plan,这一步参考QueryPlanner Strategy的实现

Source Code Module


Rule

Rule是一个抽象类,拥有一个名字,默认为类名。Rule的实现有很多,渗透在不同的处理过程(analyze, optimize)里。
RuleExecutor是规则执行类,下面两个实现会具体讲:
Analyzer
Optimizer

RuleExecutor 支持的策略:一次或多次。用来控制converge结束的条件。这里的Strategy名字感觉有点误导人。

/**
   * An execution strategy for rules that indicates the maximum number of executions. If the
   * execution reaches fix point (i.e. converge) before maxIterations, it will stop.
   */
  abstract class Strategy { def maxIterations: Int }

  /** A strategy that only runs once. */
  case object Once extends Strategy { val maxIterations = 1 }

  /** A strategy that runs until fix point or maxIterations times, whichever comes first. */
  case class FixedPoint(maxIterations: Int) extends Strategy

RuleExecutor的Batch类和batches变量:
/** A batch of rules. */
  protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

  /** Defines a sequence of rule batches, to be overridden by the implementation. */
  protected val batches: Seq[Batch]
一个batch有多个Rule
batches在apply()里执行的时候,把一个plan丢进来后,用左折叠处理每个batch,最后吐出来一个plan。
converge的条件是达到最大策略次数或者两个TreeNode相等。apply()处理过程如下:
/**
   * Executes the batches of rules defined by the subclass. The batches are executed serially
   * using the defined execution strategy. Within each batch, rules are also executed serially.
   */
  def apply(plan: TreeType): TreeType = {
    var curPlan = plan

    batches.foreach { batch =>
      var iteration = 1 
      var lastPlan = curPlan
      curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => rule(plan) }

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) {
        lastPlan = curPlan
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val result = rule(plan)
            if (!result.fastEquals(plan)) {
              logger.debug(...)
            }
            result
        }
        iteration += 1
      }
    }
    curPlan
  }

下面具体介绍RuleExecutor的实现。

Analyzer

Analyzer使用于对最初的unresolved logical plan转化成为logical plan。这部分的分析会涵盖整个analysis package。

作用是把未确定的属性和关系,通过Schema信息(来自于Catalog类)和方法注册类来确定下来,这个过程中有三步,第三步会包含许多次的迭代。

/**
 * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
 * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
 * a [[FunctionRegistry]].
 */
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

首先,Catalog类是一个记录表信息的类,专门提供给Analyzer用。
trait Catalog {
  def lookupRelation(
    databaseName: Option[String],
    tableName: String,
    alias: Option[String] = None): LogicalPlan

  def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
}

看一个SimpleCatalog的实现,该类在SQLContext里使用,把表名和LogicalPlan存在HashMap里维护起来,生命周期随上下文。提供注册表、删除表、查找表的功能。
class SimpleCatalog extends Catalog {
  val tables = new mutable.HashMap[String, LogicalPlan]()

  def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
    tables += ((tableName, plan))
  }

  def dropTable(tableName: String) = tables -= tableName

  def lookupRelation(
      databaseName: Option[String],
      tableName: String,
      alias: Option[String] = None): LogicalPlan = {