Spark Catalyst 源码分析(九)

2014-11-24 00:14:04 · 作者: · 浏览: 2
override def foldable = left.foldable && right.foldable def references = left.references ++ right.references override def toString = s"($left $symbol $right)" } abstract class LeafExpression extends Expression with trees.LeafNode[Expression] { self: Product => } abstract class UnaryExpression extends Expression with trees.UnaryNode[Expression] { self: Product => def references = child.references }

Expression impl


SchemaRDD

SchemaRDD是一个RDD[Row],Row在Catalyst对应的是Table里的一行,定义是

trait Row extends Seq[Any] with Serializable

SchemaRDD就两部分实现,还有几个SQLContext的方法调用

一是RDD的Function的实现

  // =========================================================================================
  // RDD functions: Copy the interal row representation so we present immutable data to users.
  // =========================================================================================
  override def compute(split: Partition, context: TaskContext): Iterator[Row] =
    firstParent[Row].compute(split, context).map(_.copy())

  override def getPartitions: Array[Partition] = firstParent[Row].partitions

  override protected def getDependencies: Seq[Dependency[_]] =
    List(new OneToOneDependency(queryExecution.toRdd))  // 该SchemaRDD与优化后的RDD是窄依赖

二是DSL function的实现,如

def select(exprs: NamedExpression*): SchemaRDD =
    new SchemaRDD(sqlContext, Project(exprs, logicalPlan))

每次DSL的操作会转化成为新的SchemaRDD,

SchemaRDD的DSL操作与Catalyst组件提供的操作的对应关系为

\

DSL Operator的实现都依赖Catalyst的basicOperator,basicOperator里的操作都是LogicalPlan的继承类,主要分两类,一元UnaryNode和二元BinaryNode操作。而UnaryNode和BinaryNode都是TreeNode的实现,TreeNode里还有一种就是LeafNode。

basicOperator的各种实现都是caseclass,都是LogicalPlan,不具备execute能力


\


Hive


Hive Context

HiveContext是Spark SQL执行引擎之一,将hive数据结合到Spark环境中,读取的配置在hive-site.xml里指定。

继承关系

\

HiveContext里的sql parser使用的是HiveQl,

执行hql的时候,runHive方法接收cmd,且设置了最大返回行数

protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String]

调用的方法是hive里的类,返回结果存在java的ArrayList里

错误日志会记录在outputBuffer里,用于打印输出


逻辑执行计划的几个步骤仍然类似SqlContext,因为QueryExecution也继承了过来

abstract class QueryExecution extends super.QueryExecution {

区别在于使用的实例不一样,且toRdd操作逻辑不一样


Hive Catalog

\

使用HiveMetastoreCatalog存表信息

HiveMetastoreCatalog内,通过HiveContext的hiveconf,创建了hiveclient,所以可以进行getTable,getPartition,createTable操作

HiveMetastoreCatalog内的MetastoreRelation,继承结构如下

\

通过hive的接口创建了Table,Partition,TableDesc,并带一个隐式转换HiveMetastoreTypes类,因为在把Schema里的Field转成Attribute的过程中,借助HiveMetastoreTypes的toDataType把Catalyst支持的DataType parse成hive支持的类型


Hive QL

参考HiveQl类

Hive UDF

object HiveFunctionRegistry
  extends analysis.FunctionRegistry with HiveFunctionFactory with HiveInspectors {

继承FunctionRegistry,实现的是lookupFunction方法


HiveFunctionFactory主要做反射的事情,以及把hive的类型转化成为catalyst type

包括

  def getFunctionInfo(name: String) = FunctionRegistry.getFunctionInfo(name)
  def getFunctionClass(name: String) = getFunctionInfo(name).getFunctionClass
  def createFunction[UDFType](name: String) =
    getFunctionClass(name).newInstance.asInstanceOf[UDFType]

HiveInspectors是Catalyst DataType和Hive ObjectInspector的转化


Java类到Catalyst dataType的转化

def javaClassToDataType(clz: Class[_]): DataType = clz match 

Hive Strategy

val hivePlanner = new SparkPlanner with HiveStrategi