Spark PageRank
说明
如果不考虑出度为0的节点情况,方法很easy,参考官方的code。考虑出度为0 有两个版本,V2是在V1基础上的修改完善版本,V1版本记录了各种出错记录,V2版自我感觉没有问题了。
考虑出度为0的节点的具体算法可以参考
数据 [plain]
- 1 2
- 1 3
- 1 4
- 2 1
- 3 1
- 1 5
- 2 5
V2-PageRank
[plain]
- package myclass
- import org.apache.spark.SparkContext
- import SparkContext._
- /**
- * Created by jack on 2/25/14.
- */
- object MyAccumulator {
- def main(args: Array[String]) {
- val iters = 20
- val sc = new SparkContext("local", "My PageRank", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
- val lines = sc.textFile("src/main/resources/data/pagerank_data.txt", 1)
- //根据边关系数据生成 邻接表 如:(1,(2,3,4,5)) (2,(1,5))...
- var links = lines.map(line => {
- val parts = line.split("\\s+")
- (parts(0), parts(1))
- }).distinct().groupByKey()
- //添加出度为0的节点的邻接表项 如:(4,()) (5,())...
- val nodes = scala.collection.mutable.ArrayBuffer.empty ++ links.keys.collect()
- val newNodes = scala.collection.mutable.ArrayBuffer[String]()
- for {s <- links.values.collect()
- k <- s if (!nodes.contains(k))
- } {
- nodes += k
- newNodes += k
- }
- val linkList = links ++ sc.parallelize(for (i <- newNodes) yield (i, List.empty))
- val nodeSize = linkList.count()
- var ranks = linkList.mapValues(v => 1.0 / nodeSize)
- //迭代计算PR值
- for (i <- 1 to iters) {
- val dangling = sc.accumulator(0.0)
- val contribs = linkList.join(ranks).values.flatMap {
- case (urls, rank) => {
- val size = urls.size
- if (size == 0) {
- dangling += rank
- List()
- } else {
- urls.map(url => (url, rank / size))
- }
- }
- }
- //若无下面这行,统计的dangling将为0,若用contribs.first,则dangling等于一个分片中的聚集值
- contribs.count()
- val danglingValue = dangling.value
- ranks = contribs.reduceByKey(_ + _).mapValues[Double](p =>
- 0.1 * (1.0 / nodeSize) + 0.9 * (danglingValue / nodeSize + p)
- )
- println("------------------------------" + i + "---------------------------------")
- ranks.foreach(s => println(s._1 + " - " + s._2))
- }
- }
- } 主要是使用了accumulator来记录dangling mass,需要注意的地方见代码注释。另在使用dangling值不能直接在Spark的Action操作中通过dangling.value使用。当accumulator出现在Action中,将会复制到分片(slice)上执行,执行完毕后再进行聚集。因此, 用了变量danglingValue来获得dangling的value ,进行PR值的计算。
迭代结果 迭代的次数为20次,也可以计算前后的差异阈值进行结束
[plain]
- 4 - 0.15702615478678728
- 2 - 0.15702615478678728
- 5 - 0.22768787421485948
- 1 - 0.30123366142477936
- 3 - 0.15702615478678728
V1:PageRank
各种问题
先贴上代码,再说明 [plain]
- package myclass
- import org.apache.spark.SparkContext
- import SparkContext._
- import scala.collection.mutable.ArrayBuffer
- import scala.collection.mutable
- /**
- * Created by jack on 2/22/14.
- */
- object MyPageRank {
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
- System.exit(1)
- }
- val iters = args(2).toInt
- val sc = new SparkContext(args(0), "My PageRank", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
- //未考虑出度为0的节点时的pagerank
- /* val lines = sc.textFile(args(1), 1)
- val links = lines.map(line => {
- val parts = line.split("\\s+")
- (parts(0), parts(1))
- }).distinct().groupByKey().cache()
- var ranks = links.mapValues(v => 1.0)
- for (i <- 1 to iters) {
- val contribs = links.join(ranks).values.flatMap {
- case (urls, rank) => {
- val size = urls.size
- urls.map(url => (url, rank / size))
- }
- }
- ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
- }
- val output = ranks.collect
- val urlSize = output.length
- output.foreach( tup => println(tup._1 + "has rank: " + tup._2/output.length+"."))*/
- //考虑出度为0的节点
- val lines = sc.textFile(args(1), 1)
- val linkF = lines.map(line => {
- val parts = line.split("\\s+")
- (parts(0), parts(1))
- }).distinct().groupByKey()
- var linkS = linkF
- var nodes = linkF.keys.collect()
- var newNodes = scala.collection.mutable.ArrayBuffer[String]()
- for {s <- linkF.values.collect()
- k <- s if (!nodes.contains(k))
- } {
- nodes = nodes :+ k
- newNodes += k
- }
- linkS = linkS ++ sc.makeRDD(for (i <- newNodes) yield (i, ArrayBuffer[String]()))
- val linkT = linkS
- val nodeSize = linkS.count()
- var ranks = linkT.mapValues(v => 1.0 / nodeSize)
- for (i <- 1 to iters) {
- var dangling = 0.0
- val linksAndPR = linkT.join(ranks).values
- for (i <- linksAndPR.filter(_._1.size == 0).collect()) {
- dangling += i._2
- }
- val contribs = linksAndPR.filter(_._1.size != 0).flatMap {
- case (urls, rank) => {
- val size = urls.size
- urls.map(url => (url, rank / size))
- }
- }
- ranks = contribs.reduceByKey(_ + _).mapValues[Double](p =>
- 0.1 * (1.0 / nodeSize) + 0.9 * (dangling / nodeSize + p)
- )
- println("------------------------------"+i+"---------------------------------")
- ranks.foreach(s => println(s._1 + " - " + s._2))
- }
- }
- } 以下问题针对出度为0节点的考虑:
问题1:用于dangling变量统计全局出度为0的节点的PR值和,关键就是更新的问题,用RDD的各种Transformation操作(如 foreach)无法更新dangling值,只能用for语句才有效。
问题2:针对问题1,尝试用accumulator,但是accumulator是要计算任务完成才能取值(猜测,类似hadoop的counter,无法全局统一,spark可能是分散着更新,最后再统一),单纯用accumulator不能解决问题。
问题3:对linkAndPR不得不进行了两次filter,合并处理会出现问题。
问题4:考虑出度为0的程序,只能单机跑,分布式上可能会有问题,依然是dangling问题,应该需要类似与hadoop的做法 ,2个job,但Spark的Job机制不熟悉,等以后解决。