ID, avg)
val moviID_avg: RDD[(String, Double)] = Utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).groupByKey()
.map(x => (x._1, x._2.sum / x._2.size))
//关联movieID_year和moviID_avg (movieID, (year, avg)) ---> (year, (movieID, avg))
val year_mocvieID_avg: RDD[(String, (String, Double))] = movieID_year.join(moviID_avg)
.distinct().map(x => (x._2._1, (x._1, x._2._2)))
//(year, (movieID, avg)) ---> (year, Iterable((movieID, avg))) ---> (movieID, (year, topavg))
val year_movieID_topavg: RDD[(String, (String, Double))] = year_mocvieID_avg.groupByKey().map(x => {
val list: List[(String, Double)] = x._2.toList.sortBy(-_._2)
(list(0)._1, (x._1, list(0)._2))
})
//(movieID, (type, (year, topavg)) ---> (year, type, topavg)
val year_type_topavg: RDD[(String, String, Double)] = Utils.movieRdd.map(x => (x._1, x._3))
.join(year_movieID_topavg).map(x => (x._2._2._1, x._2._1, x._2._2._2)).sortBy(_._1, false)
//输出结果
year_type_topavg.foreach(println(_))
}
}
|