vate double avg() {
return total / (double) num;
}
public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("Contains");
JavaSparkContext javaSparkContext = new JavaSparkContext(sc);
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = javaSparkContext.parallelize(data);
AvgCount initial = new AvgCount(0, 0);
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
}
}
运行结果
[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class AvgCount ~/Spark_RDD_Aggregate.jar
...
19/09/18 15:28:19 INFO DAGScheduler: Job 0 finished: aggregate at AvgCount.java:43, took 0.517385 s
3.0
...
常用的行动操作整理
collect() |
返回RDD 中的所有元素 |
rdd.collect() |
{1, 2, 3, 3} |
count() |
RDD 中的元素个数 |
rdd.count() |
4 |
countByValue() |
各元素在RDD 中出现的次数 |
rdd.countByValue() |
{(1, 1), (2, 1), (3, 2)} |
take(num) |
从RDD 中返回num 个元素 |
rdd.take(2) |
{1, 2} |
top(num) |
从RDD 中返回最前面的num 个元素 |
rdd.top(2) |
{3, 3} |
takeOrdered(num) (ordering) |
从RDD 中按照提供的顺序返 回最前面的num 个元素 |
rdd.takeOrdered(2)(myOrdering) |
{3, 3} |
takeSample(withReplace ment, num, [seed]) |
从RDD 中返回任意一些元素 |
rdd.takeSample(false, 1) |
非确定的 |
reduce(func) |
并行整合RDD 中所有数据 (例如sum) |
rdd.reduce((x, y) => x + y) |
9 |
fold(zero)(func) |
和reduce() 一样, 但是需要 提供初始值 |
rdd.fold(0)((x, y) => x + y) |
9 |
aggregate(zeroValue) (seqOp, combOp) |
和reduce() 相似, 但是通常 返回不同类型的函数 |
rdd.aggregate((0, 0)) ((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) |
(9,4) |
foreach(func) |
对RDD 中的每个元素使用给 定的函数 |
rdd.foreach(func) |
无 |
不同 RDD 的类型转换
Spark 中有些函数只能作用于特定类型的 RDD。例如 mean() 和 variance() 只能处理数值 RDD,join() 只能用于处理键值对 RDD。在 Scala 和 Java 中都没有与之对应的标准 RDD 类,故使用这些函数时必须要确保获得了正确的专用 RDD 类。(Scala 为隐式转换)
下表为 Java 中针对专门类型的函数接口:
DoubleFlatMapFunction
|
Function<T, Iterable
>
|
用于flatMapToDouble,以 生成DoubleRDD |
DoubleFunction
|
Function<T, Double> |
用于mapToDouble,以生成 DoubleRDD |
PairFlatMapFunction<T, K, V> |
Function<T, Iterable<Tuple2<K, V>>> |
用于flatMapToPair,以生 成PairRDD<K, V> |
PairFunction<T, K, V> |
Function<T, Tuple2<K, V>> |
用于mapToPair, 以生成 PairRDD<K, V> |
例程
以 DoubleFunction 为例:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;
import java.util.Arrays;
public class DoubleRDD {
public static void main(String[] args) {
SparkConf sparkConf=new SparkConf().setAppName("DoubleRDD");
JavaSparkContext javaSparkContext=new JavaSparkContext(sparkConf);
JavaRDD<Integer> rdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4));
JavaDoubleRDD result = rdd.mapToDouble(
new DoubleFunction<Integer>() {
public doubl
|