Expression impl
略SchemaRDD
SchemaRDD是一个RDD[Row],Row在Catalyst对应的是Table里的一行,定义是
trait Row extends Seq[Any] with Serializable
SchemaRDD就两部分实现,还有几个SQLContext的方法调用
一是RDD的Function的实现
// =========================================================================================
// RDD functions: Copy the interal row representation so we present immutable data to users.
// =========================================================================================
override def compute(split: Partition, context: TaskContext): Iterator[Row] =
firstParent[Row].compute(split, context).map(_.copy())
override def getPartitions: Array[Partition] = firstParent[Row].partitions
override protected def getDependencies: Seq[Dependency[_]] =
List(new OneToOneDependency(queryExecution.toRdd)) // 该SchemaRDD与优化后的RDD是窄依赖
二是DSL function的实现,如
def select(exprs: NamedExpression*): SchemaRDD =
new SchemaRDD(sqlContext, Project(exprs, logicalPlan))
每次DSL的操作会转化成为新的SchemaRDD,
SchemaRDD的DSL操作与Catalyst组件提供的操作的对应关系为
DSL Operator的实现都依赖Catalyst的basicOperator,basicOperator里的操作都是LogicalPlan的继承类,主要分两类,一元UnaryNode和二元BinaryNode操作。而UnaryNode和BinaryNode都是TreeNode的实现,TreeNode里还有一种就是LeafNode。
basicOperator的各种实现都是caseclass,都是LogicalPlan,不具备execute能力

Hive
Hive Context
HiveContext是Spark SQL执行引擎之一,将hive数据结合到Spark环境中,读取的配置在hive-site.xml里指定。
继承关系
HiveContext里的sql parser使用的是HiveQl,
执行hql的时候,runHive方法接收cmd,且设置了最大返回行数
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String]
调用的方法是hive里的类,返回结果存在java的ArrayList里
错误日志会记录在outputBuffer里,用于打印输出
逻辑执行计划的几个步骤仍然类似SqlContext,因为QueryExecution也继承了过来
abstract class QueryExecution extends super.QueryExecution {
区别在于使用的实例不一样,且toRdd操作逻辑不一样
Hive Catalog
使用HiveMetastoreCatalog存表信息
HiveMetastoreCatalog内,通过HiveContext的hiveconf,创建了hiveclient,所以可以进行getTable,getPartition,createTable操作
HiveMetastoreCatalog内的MetastoreRelation,继承结构如下
通过hive的接口创建了Table,Partition,TableDesc,并带一个隐式转换HiveMetastoreTypes类,因为在把Schema里的Field转成Attribute的过程中,借助HiveMetastoreTypes的toDataType把Catalyst支持的DataType parse成hive支持的类型
Hive QL
参考HiveQl类Hive UDF
object HiveFunctionRegistry
extends analysis.FunctionRegistry with HiveFunctionFactory with HiveInspectors {
继承FunctionRegistry,实现的是lookupFunction方法
HiveFunctionFactory主要做反射的事情,以及把hive的类型转化成为catalyst type
包括
def getFunctionInfo(name: String) = FunctionRegistry.getFunctionInfo(name)
def getFunctionClass(name: String) = getFunctionInfo(name).getFunctionClass
def createFunction[UDFType](name: String) =
getFunctionClass(name).newInstance.asInstanceOf[UDFType]
HiveInspectors是Catalyst DataType和Hive ObjectInspector的转化
Java类到Catalyst dataType的转化
def javaClassToDataType(clz: Class[_]): DataType = clz match
Hive Strategy
val hivePlanner = new SparkPlanner with HiveStrategi