Spark Catalyst 源码分析(七)

2014-11-24 00:14:04 · 作者: · 浏览: 4
bstract class Command extends LeafNode { self: Product => def output = Seq.empty } /** * Returned for commands supported by a given parser, but not catalyst. In general these are DDL * commands that are passed directly to another system. */ case class NativeCommand(cmd: String) extends Command /** * Returned by a parser when the users only wants to see what query plan would be executed, without * actually performing the execution. */ case class ExplainCommand(plan: LogicalPlan) extends Command case object NoRelation extends LeafNode { def output = Nil }
对于Command和BaseRelation,在sql.hive包内有更多实现 \

MetastoreRelation的作用在Hive一节会说明。
\
Command略。
UnaryNode
\

BinaryNode
\

Spark Plan

SparkPlan类继承结构如下图:

\

在SQL模块的execution package的basicOperator类里,有许多SparkPlan的实现,包括

Project,Filter,Sample,Union,StopAfter,TopK,Sort,ExsitingRdd

这些实现和Catalyst的basicOperator类里有很多重了,区别在于,SparkPlan是QueryPlan的实现,同logical plan不同的是,SparkPlan会被Spark实现的Strategy真正执行,所以SQL模块里的basicOperator内的这些caseclass,比Catalyst多了execute()方法

具体Spark策略的实现参考下一小节。


Planning


Query Planner

QueryPlanner的职责是把逻辑执行计划转化成为物理执行计划,具备一系列Strategy的实现。

\

abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
  /** A list of execution strategies that can be used by the planner */
  def strategies: Seq[Strategy]

  /**
   * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
   * be used for execution. If this strategy does not apply to the give logical operation then an
   * empty list should be returned.
   */
  abstract protected class Strategy extends Logging {
    def apply(plan: LogicalPlan): Seq[PhysicalPlan]
  }

  /**
   * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
   * filled in automatically by the QueryPlanner using the other execution strategies that are
   * available.
   */
  protected def planLater(plan: LogicalPlan) = apply(plan).next()

  def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...
    val iter = strategies.view.flatMap(_(plan)).toIterator
    assert(iter.hasNext, s"No plan for $plan")
    iter
  }
}

QueryPlanner impl

目前的实现是SparkStrategies

在SQLContext里的使用是SparkPlanner:

protected[sql] class SparkPlanner extends SparkStrategies {
    val sparkContext = self.sparkContext

    val strategies: Seq[Strategy] =
      TopK ::
      PartialAggregation ::
      SparkEquiInnerJoin ::
      BasicOperators ::
      CartesianProduct ::
      BroadcastNestedLoopJoin :: Nil
  }

在HiveContext里的使用是带了hive策略的SparkPlanner:

val hivePlanner = new SparkPlanner with HiveStrategies {
    val hiveContext = self

    override val strategies: Seq[Strategy] = Seq(
      TopK,
      ColumnPrunings,
      PartitionPrunings,
      HiveTableScans,
      DataSinks,
      Scripts,
      PartialAggregation,
      SparkEquiInnerJoin,
      BasicOperators,
      CartesianProduct,
      BroadcastNestedLoopJoin
    )
  }

Strategy & impl

Strategy的实现主要包含Spark Strategy和Hive Strategy。前者基本上对应了sql.execution包里的类。后者是在Spark策略的基础上附加的一些策略。

Expression

Expression几个属性:

1. 带DataType,并且自带一些inline方法帮助一些dataType的转换

2. 带reference,reference是Seq[Attribute],Attribute是NamedExpression子类。

3. foldable ,即静态可以直接执行的表达式

Expression里只有Literal可折叠,Literal是LeafExpression,根据dataType生成不同类型表达式

object Literal {
  def apply(v: Any): Literal = v match {
    case i: Int => Literal(i, IntegerType)
    case l: Long => Literal(l, LongType)
    case d: Double => Literal(d, DoubleType)
    case f: Float => Literal(f, FloatType)
    case b: Byte => Literal(b, ByteType)
    case s: Short => Literal(s, ShortType)
    case s: String => Literal(s, StringType)
    case b: Boolean => Literal(b, BooleanType)
    case null => Literal(null, NullType)
  }
}

case class Literal(value: Any, dataType: DataType) extends LeafExpression {

  override def foldable = true
  def nullable = value == null
  def references = Set.empty

  override def toString = if (value != null) value.toString else "null"

  type eva luatedType = Any
  override def apply(input: Row):Any = value // 执行这个叶子表达式的话就是返回value值
}

4. resolved 具体关心children是否都resolved。

childeren是TreeNode里的概念,在TreeNode里是一个Seq[BaseType],而BaseType是TreeNode[T]里的范型。在Expression这里,即TreeNode[Expression],BaseType就是Expression。


Expression继承结构

\

抽象子类如下:

abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] {
  self: Product =>
  def symbol: String