Spark Catalyst 源码分析(六)

2014-11-24 00:14:04 · 作者: · 浏览: 1
上长得一样结构的node也不相同,比较如下:

  def sameInstance(other: TreeNode[_]): Boolean = {
    this.id == other.id
  }

  def fastEquals(other: TreeNode[_]): Boolean = {
    sameInstance(other) || this == other
  }

foreach的时候,先做自己,再把孩子们做一遍
def foreach(f: BaseType => Unit): Unit = {
    f(this)
    children.foreach(_.foreach(f))
  }

map的时候是按前序对每个节点都做一次处理

def map[A](f: BaseType => A): Seq[A] = {
    val ret = new collection.mutable.ArrayBuffer[A]()
    foreach(ret += f(_))
    ret
  }

其他的很多变化都类似,接收的是函数或偏函数,把他们作用到匹配的节点上去执行

变化总共有这些,按类别分:

map, flatMap, collect,

mapChildren, withNewChildren,

transform, transformDown, transformChildrenDown 前序

transformUp, transformChildrenUp 后序

基本上就这些,其实就是提供对这棵树及其子节点的顺序遍历和处理能力


Plan

QueryPlan的继承结构

\

QueryPlan提供了三个东西,

其一是定义了output,是对外输出的一个属性序列

def output:Seq[Attribute]


其二是借用TreeNode的那套transform方法,实现了一套transformExpression方法,用途是把partialfunction遍历到各个子节点上。

其三是一个expressions方法,返回Seq[expression],用于搜集本query里所有的表达式。

QueryPlan在Catalyst里的实现是LogicalPlan,在SQL组件里的实现是SparkPlan,前者主要要被处理、分析和优化,后者是真正被处理执行的,下面简单介绍两者。


Logical Plan

在QueryPlan上增加的几个属性:

1. references 用于生成output属性列表的参考属性列表

def references: Set[Attribute]

2. lazy val inputSet: Set[Attribute] = children.flatMap(_.output).toSet

3. 自己及children是否resolved

4. resolve方法,重要,看起来费劲

def resolve(name: String): Option[NamedExpression] = {
    val parts = name.split("\\.")
    // Collect all attributes that are output by this nodes children where either the first part
    // matches the name or where the first part matches the scope and the second part matches the
    // name.  Return these matches along with any remaining parts, which represent dotted access to
    // struct fields.
    val options = children.flatMap(_.output).flatMap { option =>
      // If the first part of the desired name matches a qualifier for this possible match, drop it.
      val remainingParts = if (option.qualifiers contains parts.head) parts.drop(1) else parts
      if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
    }

    options.distinct match {
      case (a, Nil) :: Nil => Some(a) // One match, no nested fields, use it.
      // One match, but we also need to extract the requested nested field.
      case (a, nestedFields) :: Nil =>
        a.dataType match {
          case StructType(fields) =>
            Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
          case _ => None // Don't know how to resolve these field references
        }
      case Nil => None         // No matches.
      case ambiguousReferences =>
        throw new TreeNodeException(
          this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
    }
  }

三种抽象子类:

/**
 * A logical plan node with no children.
 */
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
  self: Product =>
  // Leaf nodes by definition cannot reference any input attributes.
  def references = Set.empty
}

/**
 * A logical plan node with single child.
 */
abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
  self: Product =>
}

/**
 * A logical plan node with a left and right child.
 */
abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
  self: Product =>
}

分别看LogicalPlan的三种Node的实现结构:LeafNode,UnaryNode,BinaryNode


LeafNode
\
/**
 * A logical node that represents a non-query command to be executed by the system.  For example,
 * commands can be used by parsers to represent DDL operations.
 */
a