在查找的时候可以代入一个别名,会把他包装成一个Subquery。Subquery是个简单的case class。
case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
def output = child.output.map(_.withQualifiers(alias :: Nil))
def references = Set.empty
}
FunctionRegistry类似于Catalog,记录的是函数,在hive package里,处理的是Hive的UDF
trait FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression
}
FunctionRegistry的实现在Catalyst里目前只有一个(在Hive模块里有实现,具体在最后一节Hive内),如下,如果你要查找方法,就会抛异常。
/**
* A trivial catalog that returns an error when a function is requested. Used for testing when all
* functions are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyFunctionRegistry extends FunctionRegistry {
def lookupFunction(name: String, children: Seq[Expression]): Expression = {
throw new UnsupportedOperationException
}
}
回到Analyzer,SQLContext在使用Analyzer前,这样生成:
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog
protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)
接下来看Catalyst现在的Analyzer作为一个RuleExecutor,已经实现的功能:
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {
// TODO: pass this in as a parameter.
val fixedPoint = FixedPoint(100)
val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once,
NewRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once,
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*)
)
下面分别分析三个batch里面的Rule做的事情。
Batch One
首先是第一个batch里的NewRelationInstance这条Rule,他的作用就是避免一个逻辑计划上同一个实例出现多次,如果出现就生成一个新的plan,保证每个表达式id都唯一。
/**
* If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
* that each instance has unique expression ids for the attributes produced.
*/
object NewRelationInstances extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val localRelations = plan collect { case l: MultiInstanceRelation => l} // 这一步是搜集所有的多实例关系
val multiAppearance = localRelations
.groupBy(identity[MultiInstanceRelation])
.filter { case (_, ls) => ls.size > 1 }
.map(_._1)
.toSet // 这一步是做过滤
plan transform { // 这一步是把原来plan里的多实例关系,凡是出现多个,就变成一个新的单一实例
case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
}
}
}
LogicalPlan本身是TreeNode的子类,TreeNode具备collect等一些scala collection操作的能力,这个例子里第一步搜集的过程中体现了collect能力。
Batch Two
第二个batch是大小写相关的,如果对大小写不敏感,那么就执行LowercaseAttributeReferences这条Rule,会把所有的属性都变成小写
/**
* Makes attribute naming case insensitive by turning all UnresolvedAttributes to lowercase.
*/
object LowercaseAttributeReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPl