设为首页 加入收藏

TOP

Spark学习笔记3——RDD(下)(三)
2019-09-19 11:12:04 】 浏览:157
Tags:Spark 学习 笔记 RDD
vate double avg() { return total / (double) num; } public static void main(String[] args) { SparkConf sc = new SparkConf().setAppName("Contains"); JavaSparkContext javaSparkContext = new JavaSparkContext(sc); List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = javaSparkContext.parallelize(data); AvgCount initial = new AvgCount(0, 0); Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() { public AvgCount call(AvgCount a, Integer x) { a.total += x; a.num += 1; return a; } }; Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() { public AvgCount call(AvgCount a, AvgCount b) { a.total += b.total; a.num += b.num; return a; } }; AvgCount result = rdd.aggregate(initial, addAndCount, combine); System.out.println(result.avg()); } }

运行结果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class AvgCount ~/Spark_RDD_Aggregate.jar
...
19/09/18 15:28:19 INFO DAGScheduler: Job 0 finished: aggregate at AvgCount.java:43, took 0.517385 s
3.0
...

常用的行动操作整理

函数名 目的 示例 结果
collect() 返回RDD 中的所有元素 rdd.collect() {1, 2, 3, 3}
count() RDD 中的元素个数 rdd.count() 4
countByValue() 各元素在RDD 中出现的次数 rdd.countByValue() {(1, 1),
(2, 1),
(3, 2)}
take(num) 从RDD 中返回num 个元素 rdd.take(2) {1, 2}
top(num) 从RDD 中返回最前面的num
个元素
rdd.top(2) {3, 3}
takeOrdered(num)
(ordering)
从RDD 中按照提供的顺序返
回最前面的num 个元素
rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(withReplace
ment, num, [seed])
从RDD 中返回任意一些元素 rdd.takeSample(false, 1) 非确定的
reduce(func) 并行整合RDD 中所有数据
(例如sum)
rdd.reduce((x, y) => x + y) 9
fold(zero)(func) 和reduce() 一样, 但是需要
提供初始值
rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)
(seqOp, combOp)
和reduce() 相似, 但是通常
返回不同类型的函数
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
(9,4)
foreach(func) 对RDD 中的每个元素使用给
定的函数
rdd.foreach(func)

不同 RDD 的类型转换

Spark 中有些函数只能作用于特定类型的 RDD。例如 mean() 和 variance() 只能处理数值 RDD,join() 只能用于处理键值对 RDD。在 Scala 和 Java 中都没有与之对应的标准 RDD 类,故使用这些函数时必须要确保获得了正确的专用 RDD 类。(Scala 为隐式转换)

下表为 Java 中针对专门类型的函数接口:

函数名 等价函数 用途
DoubleFlatMapFunction Function<T, Iterable > 用于flatMapToDouble,以
生成DoubleRDD
DoubleFunction Function<T, Double> 用于mapToDouble,以生成
DoubleRDD
PairFlatMapFunction<T, K, V> Function<T, Iterable<Tuple2<K, V>>> 用于flatMapToPair,以生
成PairRDD<K, V>
PairFunction<T, K, V> Function<T, Tuple2<K, V>> 用于mapToPair, 以生成
PairRDD<K, V>

例程

以 DoubleFunction 为例:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.DoubleFunction;

import java.util.Arrays;

public class DoubleRDD {
    public static void main(String[] args) {
        SparkConf sparkConf=new SparkConf().setAppName("DoubleRDD");
        JavaSparkContext javaSparkContext=new JavaSparkContext(sparkConf);
        JavaRDD<Integer> rdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4));
        JavaDoubleRDD result = rdd.mapToDouble(
                new DoubleFunction<Integer>() {
                    public doubl
首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇〈一〉ElasticSearch的介绍 下一篇centos7放行1521端口

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目