Spark Catalyst 源码分析(二)

2014-11-24 00:14:04 · 作者: · 浏览: 7
val table = tables.get(tableName).getOrElse(sys.error(s"Table Not Found: $tableName")) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are // properly qualified with this alias. alias.map(a => Subquery(a.toLowerCase, table)).getOrElse(table) } }
在查找的时候可以代入一个别名,会把他包装成一个Subquery。Subquery是个简单的case class。
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
  def output = child.output.map(_.withQualifiers(alias :: Nil))
  def references = Set.empty
}

FunctionRegistry类似于Catalog,记录的是函数,在hive package里,处理的是Hive的UDF
trait FunctionRegistry {
  def lookupFunction(name: String, children: Seq[Expression]): Expression
}

FunctionRegistry的实现在Catalyst里目前只有一个(在Hive模块里有实现,具体在最后一节Hive内),如下,如果你要查找方法,就会抛异常。
/**
 * A trivial catalog that returns an error when a function is requested.  Used for testing when all
 * functions are already filled in and the analyser needs only to resolve attribute references.
 */
object EmptyFunctionRegistry extends FunctionRegistry {
  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
    throw new UnsupportedOperationException
  }
}

回到Analyzer,SQLContext在使用Analyzer前,这样生成:
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog
protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)

接下来看Catalyst现在的Analyzer作为一个RuleExecutor,已经实现的功能:
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

  // TODO: pass this in as a parameter.
  val fixedPoint = FixedPoint(100)

  val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*)
  )

下面分别分析三个batch里面的Rule做的事情。
Batch One

首先是第一个batch里的NewRelationInstance这条Rule,他的作用就是避免一个逻辑计划上同一个实例出现多次,如果出现就生成一个新的plan,保证每个表达式id都唯一。

/**
 * If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
 * that each instance has unique expression ids for the attributes produced.
 */
object NewRelationInstances extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = {
    val localRelations = plan collect { case l: MultiInstanceRelation => l} // 这一步是搜集所有的多实例关系
    val multiAppearance = localRelations
      .groupBy(identity[MultiInstanceRelation])
      .filter { case (_, ls) => ls.size > 1 }
      .map(_._1)
      .toSet // 这一步是做过滤

    plan transform { // 这一步是把原来plan里的多实例关系,凡是出现多个,就变成一个新的单一实例
      case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
    }
  }
}

LogicalPlan本身是TreeNode的子类,TreeNode具备collect等一些scala collection操作的能力,这个例子里第一步搜集的过程中体现了collect能力。

TreeNode是Catalyst里的重要基础类,后面有小节会具体讲。
Batch Two

第二个batch是大小写相关的,如果对大小写不敏感,那么就执行LowercaseAttributeReferences这条Rule,会把所有的属性都变成小写

/**
   * Makes attribute naming case insensitive by turning all UnresolvedAttributes to lowercase.
   */
  object LowercaseAttributeReferences extends Rule[LogicalPlan] {
    def apply(plan: LogicalPl