Spark Catalyst 源码分析(三)

2014-11-24 00:14:04 · 作者: · 浏览: 6
an): LogicalPlan = plan transform { case UnresolvedRelation(databaseName, name, alias) => // 第一类:未确定的关系 UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase)) case Subquery(alias, child) => Subquery(alias.toLowerCase, child) // 第二类:子查询 case q: LogicalPlan => q transformExpressions { // 第三类: 其他类型 case s: Star => s.copy(table = s.table.map(_.toLowerCase)) // 指的是 * 号 case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase) // 未确定的属性 case Alias(c, name) => Alias(c, name.toLowerCase)() // 别名 } } }

transform,transformExpressions是TreeNode提供的方法,用于前序遍历树(pre-order)。

从这个处理可以看到logicalPlan里面包含的种类。后续Expression这一块具体还要展开介绍。

Alias的一点注释:

/**
 * Used to assign a new name to a computation.
 * For example the SQL expression "1 + 1 AS a" could be represented as follows:
 *  Alias(Add(Literal(1), Literal(1), "a")()
 *

Batch Three

Resulotion是第三类batch,定义的结束条件是循环100次。下面是我加的注释,大致介绍Rule的作用,并挑选几个Rule的实现介绍。

Batch("Resolution", fixedPoint,
      ResolveReferences :: // 确定属性
      ResolveRelations :: // 确定关系(从catalog里)
      NewRelationInstances :: // 去掉同一个实例出现多次的情况
      ImplicitGenerate :: // 把包含Generator且只有一条的表达式转化成Generate操作
      StarExpansion :: // 扩张 * 
      ResolveFunctions :: // 确定方法(从FunctionRegistry里)
      GlobalAggregates :: // 把包含Aggregate的表达式转化成Aggregate操作
      typeCoercionRules :_*) // 来自于HiveTypeCoercion,主要针对Hive语法做强制转换,包含多种规则

用post-order遍历树,把未确定的属性确定下来。如果没有做成功,未确定的属性依然会留下来,留给下一次迭代的时候再确定。

/**
   * Replaces [[UnresolvedAttribute]]s with concrete
   * [[expressions.AttributeReference AttributeReferences]] from a logical plan node's children.
   */
  object ResolveReferences extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
      case q: LogicalPlan if q.childrenResolved =>
        logger.trace(s"Attempting to resolve ${q.simpleString}")
        q transformExpressions {
          case u @ UnresolvedAttribute(name) =>
            // Leave unchanged if resolution fails.  Hopefully will be resolved next round.
            val result = q.resolve(name).getOrElse(u)
            logger.debug(s"Resolving $u to $result")
            result
        }
    }
  }

确定是通过LogicalPlan的resolve方法做的。这个具体在LogicalPlan里介绍,resolve方法是LogicalPlan的唯一且重要方法。


从catalog里查找关系

/**
   * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
   */
  object ResolveRelations extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case UnresolvedRelation(databaseName, name, alias) =>
        catalog.lookupRelation(databaseName, name, alias)
    }
  }

Generator是表达式的一种,根据一种inputrow产生0个或多个rows。

/**
   * When a SELECT clause has only a single expression and that expression is a
   * [[catalyst.expressions.Generator Generator]] we convert the
   * [[catalyst.plans.logical.Project Project]] to a [[catalyst.plans.logical.Generate Generate]].
   */
  object ImplicitGenerate extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case Project(Seq(Alias(g: Generator, _)), child) =>
        Generate(g, join = false, outer = false, None, child)
    }
  }

确定方法类似确定关系。

/**
   * Replaces [[UnresolvedFunction]]s with concrete [[expressions.Expression Expressions]].
   */
  object ResolveFunctions extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case q: LogicalPlan =>
        q transformExpressions {
          case u @ UnresolvedFunction(name, children) if u.childrenResolved =>
            registry.lookupFunction(name