Spark Catalyst 源码分析(五)

2014-11-24 00:14:04 · 作者: · 浏览: 3
lity for constant folding: * - A [[expressions.Coalesce Coalesce]] is foldable if all of its children are foldable * - A [[expressions.BinaryExpression BinaryExpression]] is foldable if its both left and right * child are foldable * - A [[expressions.Not Not]], [[expressions.IsNull IsNull]], or * [[expressions.IsNotNull IsNotNull]] is foldable if its child is foldable. * - A [[expressions.Literal]] is foldable. * - A [[expressions.Cast Cast]] or [[expressions.UnaryMinus UnaryMinus]] is foldable if its * child is foldable. */ // TODO: Supporting more foldable expressions. For example, deterministic Hive UDFs. def foldable: Boolean = false 只有Literal表达式是foldable的,其余表达式必须表达式中每个元素都满足foldable。

第二种规则也好理解,简化布尔表达式。也就是早早地给表达式做一个短路判断。

/**
 * Simplifies boolean expressions where the answer can be determined without eva luating both sides.
 * Note that this rule can eliminate expressions that might otherwise have been eva luated and thus
 * is only safe when eva luations of expressions does not result in side effects.
 */
object BooleanSimplification extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case and @ And(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), r) => r
          case (l, Literal(true, BooleanType)) => l
          case (Literal(false, BooleanType), _) => Literal(false)
          case (_, Literal(false, BooleanType)) => Literal(false)
          case (_, _) => and
        }

      case or @ Or(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), _) => Literal(true)
          case (_, Literal(true, BooleanType)) => Literal(true)
          case (Literal(false, BooleanType), r) => r
          case (l, Literal(false, BooleanType)) => l
          case (_, _) => or
        }
    }
  }
}

把Cast操作全部移走。

/**
 * Removes [[catalyst.expressions.Cast Casts]] that are unnecessary because the input is already
 * the correct type.
 */
object SimplifyCasts extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
    case Cast(e, dataType) if e.dataType == dataType => e
  }
}

Batch Three

一批 过滤下推 规则,

Batch("Filter Pushdown", Once,
      EliminateSubqueries, // 消除子查询
      CombineFilters, // 过滤操作取合集
      PushPredicateThroughProject, // 为映射操作下推谓词
      PushPredicateThroughInnerJoin) // 为inner join下推谓词

具体不一一列举了。

SQLContext

SQLContext的这一个RuleExecutor实现已经到了物理执行计划SparkPlan的处理了。也是一种实现,注册了自己的batch,如下:
/**
   * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
   * inserting shuffle operations as needed.
   */
  @transient
  protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
    val batches =
      Batch("Add exchange", Once, AddExchange) ::
      Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
  }


以上就是Rule包,及RuleExecutor在各处的实现。其中Analyze和Optimize是Catalyst目前提供的,SQL组件直接拿来使用。

TreeNode

TreeNode Library支持的三个特性:

Scala collection like methods (foreach, map, flatMap, collect, etc)

transform accepts a partial function that is used to generate a newtree.

debugging support pretty printing, easy splicing of trees, etc.

Collection操作能力

偏函数


继承结构 \

全局唯一id

object TreeNode {
  private val currentId = new java.util.concurrent.atomic.AtomicLong
  protected def nextId() = currentId.getAndIncrement()
}

几种节点

/**
 * A [[TreeNode]] that has two children, [[left]] and [[right]].
 */
trait BinaryNode[BaseType <: TreeNode[BaseType]] {
  def left: BaseType
  def right: BaseType

  def children = Seq(left, right)
}

/**
 * A [[TreeNode]] with no children.
 */
trait LeafNode[BaseType <: TreeNode[BaseType]] {
  def children = Nil
}

/**
 * A [[TreeNode]] with a single [[child]].
 */
trait UnaryNode[BaseType <: TreeNode[BaseType]] {
  def child: BaseType
  def children = child :: Nil
}

每个node唯一id,导致在比较的时候,不同分支