Spark Catalyst 源码分析(四)

2014-11-24 00:14:04 · 作者: · 浏览: 5
, children) } } }


换针对Hive语法做强制转换,规则如下

trait HiveTypeCoercion {
  val typeCoercionRules = List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts, StringToIntegralCasts, FunctionArgumentConversion)

举个简单的例子来看下表达式的使用和替换:

/**
   * Converts string "NaN"s that are in binary operators with a NaN-able types (Float / Double) * to the appropriate numeric equivalent.
   */
  object ConvertNaNs extends Rule[LogicalPlan] {
    val stringNaN = Literal("NaN", StringType)

    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case q: LogicalPlan => q transformExpressions {
        // Skip nodes who's children have not been resolved yet.
        case e if !e.childrenResolved => e

        /* Double Conversions */
        case b: BinaryExpression if b.left == stringNaN && b.right.dataType == DoubleType =>
          b.makeCopy(Array(b.right, Literal(Double.NaN)))
        case b: BinaryExpression if b.left.dataType == DoubleType && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Double.NaN), b.left))
        case b: BinaryExpression if b.left == stringNaN && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Double.NaN), b.left))

        /* Float Conversions */
        case b: BinaryExpression if b.left == stringNaN && b.right.dataType == FloatType =>
          b.makeCopy(Array(b.right, Literal(Float.NaN)))
        case b: BinaryExpression if b.left.dataType == FloatType && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Float.NaN), b.left))
        case b: BinaryExpression if b.left == stringNaN && b.right == stringNaN =>
          b.makeCopy(Array(Literal(Float.NaN), b.left))
      }
    }
  }

Optimizer

Optimizer用于把analyzedplan转化成为optimized plan。目前Catalyst的optimizer包下就这一个类,SQLContext也是直接使用的这个类。

同样,我们看一下里面包括了哪些处理过程:

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Subqueries", Once,
      EliminateSubqueries) ::
    Batch("ConstantFolding", Once,
      ConstantFolding,
      BooleanSimplification,
      SimplifyCasts) ::
    Batch("Filter Pushdown", Once,
      EliminateSubqueries,
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughInnerJoin) :: Nil
}


Batch One

和子查询相关的一批规则,包含一条消除子查询的规则:EliminateSubqueries

/**
 * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan.  Subqueries are
 * only required to provide scoping information for attributes and can be removed once analysis is
 * complete.
 */
object EliminateSubqueries extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Subquery(_, child) => child // 处理方式是凡是带child的,都用child替换自己
  }
}

注释提到,过了analysis这一步之后,子查询就可以移除了。


Batch Two

第二批规则,常量折叠。

Batch("ConstantFolding", Once,
      ConstantFolding, // 常量折叠
      BooleanSimplification, // 提早短路掉布尔表达式
      SimplifyCasts) // 去掉多余的Cast操作

具体看:
/**
 * Replaces [[catalyst.expressions.Expression Expressions]] that can be statically eva luated with
 * equivalent [[catalyst.expressions.Literal Literal]] values.
 */
object ConstantFolding extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsDown {
      // Skip redundant folding of literals.
      case l: Literal => l
      case e if e.foldable => Literal(e.apply(null), e.dataType)
    }
  }
}

这里不得不提一下foldable字段在Expression类里的定义:

/**
   * Returns true when an expression is a candidate for static eva luation before the query is
   * executed.
   *
   * The following conditions are used to determine suitabi