介绍
Spork是Pig on Spark的highly experimental版本,依赖的版本也比较久,如之前文章里所说,目前我把Spork维护在自己的github上:flare-spork。 本文分析的是Spork的实现方式和具体内容。Spark Launcher
在hadoop executionengine包路径下,写了一个Spark启动器,同MapReduceLauncher类似,会在launchPig的时候,把传入的物理执行计划进行翻译。 MR启动器翻译的是MR的操作,以及进一步的MR JobControl。而Spark启动器将物理执行计划的部分物理操作直接翻译成了RDD的操作。 有一个缺点是翻译成RDD算子之后,缺少优化过程,也就是直接物理操作的映射翻译,具体执行逻辑会完全交给Spark DAGScheduler去切分,由TaskScheduler去调度任务。 比如对Pig来说,直到见到Dump/Store,才会触发整个翻译和launch。那么在这一次物理执行计划中,对应到Spark可能是多次任务。在目前的实现方式下,翻译物理操作交给多个Convertor的实现类来完成,
public interface POConverter抽象类POConvertor提供了convert方法,输入参数中的List{ RDD convert(List > rdd, T physicalOperator) throws IOException; }
在使用的时候,以-x spark的方式就可以启动以Spark为backend engine的Pig环境。
下面具体看目前做了哪些PO操作的转化工作,具体怎么转化的。
Load/Store
走的都是NewHadoopRDD路线。
Load方面是通过POLoad获得文件路径,pigContext获得必要配置信息,然后交由SparkContext调用newAPIHadoopFile来获得NewHadoopRDD,最后把Tuple2
Store方面是先把最近的前驱rdd转会成Key为空Text的Tuple2
Foreach、Filter、Limit
ForEach里实现一个Iterator[T] => Iterator[T]的方法,把foreach转化为rdd.mapPartitions()方法。
Iterator[T]=> Iterator[T]方法的实现,会依赖原本的POForEach来获得nextTuple和进行一些别的操作,来实现一个新的Iterator。
对于hadoop backend的executionengine里的抽象类PhysicalOperator来说,
setInput()和attachInput()方法是放入带处理的tuple数据,
getNextTuple()的时候触发processTuple(),处理对象就是内部的Input Tuple。
所以ForEach操作实现Iterator的时候,在readNext()方法里掺入了以上设置Input数据的操作,在返回前调用getNextTuple()返回处理后的结果。
POFilter也是通过setInput()和attachInput()以及getNextTuple()来返回处理结果。
所以在实现为RDD操作的时候,把以上步骤包装成一个FilterFunction,传入rdd.filter(Function)处理。
POLimit同POFilter是完全一样的。
Distinct
现在RDD已经直接具备distinct(numPartitions: Int)方法了。
这里的distinct实现同rdd里的distinct逻辑是完全一样的。
第一步:把类型为Tuple的rdd映射成为Tuple2
第二步:进行rdd.reduceByKey(merge_function, parallelism)操作,merge_function对两个value部分的Object不做任何处理,也就是按key reduce且不对value部分处理;
第三步:对第二步的结果进行rdd.map(function, ClassTag)处理,function为得到Tuple2
Union
Union是一次求并过程,直接new UnionRDD
由于UnionRDD处理的是Seq
Sort
Sort过程:
第一步:把Tuple类型的RDD转成Tuple2
第二步:根据第一步结果,new OrderedRDDFunctions
,其sortByKey方法产出一个排过序的RDD
第三步:调用rdd.mapPartition(function, xx, xx),function作用为把Iterator
Split
POSplit的处理是直接返回第一个祖先RDD。
LocalRearrange
LocalRearrange -> Global Rearrange -> Package是一同出现的。
Local rearrange直接依赖
physicalOperator.setInputs(null); physicalOperator.attachInput(t); result = physicalOperator.getNextTuple();
三步得到result。返回的Tuple格式为(index, key, value)。
依赖POLocalRearrange本身内部对input tuple的处理。
GlobalRearrange
待处理的Tuple格式是(index, key, value)。最后结果为(key, { values })
如果父RDD只有一个:
先进行按key进行一次groupBy,得到结果是Tuple2
然后做一次map操作,得到(key, { values })形态的RDD,即Tuple
如果父RDD有多个:
让通过rdd的m