Spark PageRank

Spark PageRank

说明

如果不考虑出度为0的节点情况,方法很easy,参考官方的code。考虑出度为0 有两个版本,V2是在V1基础上的修改完善版本,V1版本记录了各种出错记录,V2版自我感觉没有问题了。

考虑出度为0的节点的具体算法可以参考

数据     [plain]

  1. 1 2
  2. 1 3
  3. 1 4
  4. 2 1
  5. 3 1
  6. 1 5
  7. 2 5

V2-PageRank

[plain]

  1. package myclass
  2. import org.apache.spark.SparkContext
  3. import SparkContext._
  4. /**
  5. * Created by jack on 2/25/14.
  6. */
  7. object MyAccumulator {
  8. def main(args: Array[String]) {
  9. val iters = 20
  10. val sc = new SparkContext("local", "My PageRank", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
  11. val lines = sc.textFile("src/main/resources/data/pagerank_data.txt", 1)
  12. //根据边关系数据生成 邻接表 如:(1,(2,3,4,5)) (2,(1,5))...
  13. var links = lines.map(line => {
  14. val parts = line.split("\\s+")
  15. (parts(0), parts(1))
  16. }).distinct().groupByKey()
  17. //添加出度为0的节点的邻接表项 如:(4,()) (5,())...
  18. val nodes = scala.collection.mutable.ArrayBuffer.empty ++ links.keys.collect()
  19. val newNodes = scala.collection.mutable.ArrayBuffer[String]()
  20. for {s <- links.values.collect()
  21. k <- s if (!nodes.contains(k))
  22. } {
  23. nodes += k
  24. newNodes += k
  25. }
  26. val linkList = links ++ sc.parallelize(for (i <- newNodes) yield (i, List.empty))
  27. val nodeSize = linkList.count()
  28. var ranks = linkList.mapValues(v => 1.0 / nodeSize)
  29. //迭代计算PR值
  30. for (i <- 1 to iters) {
  31. val dangling = sc.accumulator(0.0)
  32. val contribs = linkList.join(ranks).values.flatMap {
  33. case (urls, rank) => {
  34. val size = urls.size
  35. if (size == 0) {
  36. dangling += rank
  37. List()
  38. } else {
  39. urls.map(url => (url, rank / size))
  40. }
  41. }
  42. }
  43. //若无下面这行,统计的dangling将为0,若用contribs.first,则dangling等于一个分片中的聚集值
  44. contribs.count()
  45. val danglingValue = dangling.value
  46. ranks = contribs.reduceByKey(_ + _).mapValues[Double](p =>
  47. 0.1 * (1.0 / nodeSize) + 0.9 * (danglingValue / nodeSize + p)
  48. )
  49. println("------------------------------" + i + "---------------------------------")
  50. ranks.foreach(s => println(s._1 + " - " + s._2))
  51. }
  52. }
  53. }    主要是使用了accumulator来记录dangling mass,需要注意的地方见代码注释。另在使用dangling值不能直接在Spark的Action操作中通过dangling.value使用。当accumulator出现在Action中,将会复制到分片(slice)上执行,执行完毕后再进行聚集。因此, 用了变量danglingValue来获得dangling的value ,进行PR值的计算。

迭代结果   迭代的次数为20次,也可以计算前后的差异阈值进行结束

[plain]

  1. 4 - 0.15702615478678728
  2. 2 - 0.15702615478678728
  3. 5 - 0.22768787421485948
  4. 1 - 0.30123366142477936
  5. 3 - 0.15702615478678728

V1:PageRank

各种问题

先贴上代码,再说明    [plain]

  1. package myclass
  2. import org.apache.spark.SparkContext
  3. import SparkContext._
  4. import scala.collection.mutable.ArrayBuffer
  5. import scala.collection.mutable
  6. /**
  7. * Created by jack on 2/22/14.
  8. */
  9. object MyPageRank {
  10. def main(args: Array[String]) {
  11. if (args.length < 3) {
  12. System.err.println("Usage: PageRank <master> <file> <number_of_iterations>")
  13. System.exit(1)
  14. }
  15. val iters = args(2).toInt
  16. val sc = new SparkContext(args(0), "My PageRank", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
  17. //未考虑出度为0的节点时的pagerank
  18. /*      val lines = sc.textFile(args(1), 1)
  19. val links = lines.map(line => {
  20. val parts = line.split("\\s+")
  21. (parts(0), parts(1))
  22. }).distinct().groupByKey().cache()
  23. var ranks = links.mapValues(v => 1.0)
  24. for (i <- 1 to iters) {
  25. val contribs = links.join(ranks).values.flatMap {
  26. case (urls, rank) => {
  27. val size = urls.size
  28. urls.map(url => (url, rank / size))
  29. }
  30. }
  31. ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
  32. }
  33. val output = ranks.collect
  34. val urlSize = output.length
  35. output.foreach( tup => println(tup._1 + "has rank: " + tup._2/output.length+"."))*/
  36. //考虑出度为0的节点
  37. val lines = sc.textFile(args(1), 1)
  38. val linkF = lines.map(line => {
  39. val parts = line.split("\\s+")
  40. (parts(0), parts(1))
  41. }).distinct().groupByKey()
  42. var linkS = linkF
  43. var nodes = linkF.keys.collect()
  44. var newNodes = scala.collection.mutable.ArrayBuffer[String]()
  45. for {s <- linkF.values.collect()
  46. k <- s if (!nodes.contains(k))
  47. } {
  48. nodes = nodes :+ k
  49. newNodes += k
  50. }
  51. linkS = linkS ++ sc.makeRDD(for (i <- newNodes) yield (i, ArrayBuffer[String]()))
  52. val linkT = linkS
  53. val nodeSize = linkS.count()
  54. var ranks = linkT.mapValues(v => 1.0 / nodeSize)
  55. for (i <- 1 to iters) {
  56. var dangling = 0.0
  57. val linksAndPR = linkT.join(ranks).values
  58. for (i <- linksAndPR.filter(_._1.size == 0).collect()) {
  59. dangling += i._2
  60. }
  61. val contribs = linksAndPR.filter(_._1.size != 0).flatMap {
  62. case (urls, rank) => {
  63. val size = urls.size
  64. urls.map(url => (url, rank / size))
  65. }
  66. }
  67. ranks = contribs.reduceByKey(_ + _).mapValues[Double](p =>
  68. 0.1 * (1.0 / nodeSize) + 0.9 * (dangling / nodeSize + p)
  69. )
  70. println("------------------------------"+i+"---------------------------------")
  71. ranks.foreach(s => println(s._1 + " - " + s._2))
  72. }
  73. }
  74. }    以下问题针对出度为0节点的考虑:

问题1:用于dangling变量统计全局出度为0的节点的PR值和,关键就是更新的问题,用RDD的各种Transformation操作(如 foreach)无法更新dangling值,只能用for语句才有效。

问题2:针对问题1,尝试用accumulator,但是accumulator是要计算任务完成才能取值(猜测,类似hadoop的counter,无法全局统一,spark可能是分散着更新,最后再统一),单纯用accumulator不能解决问题。

问题3:对linkAndPR不得不进行了两次filter,合并处理会出现问题。

问题4:考虑出度为0的程序,只能单机跑,分布式上可能会有问题,依然是dangling问题,应该需要类似与hadoop的做法 ,2个job,但Spark的Job机制不熟悉,等以后解决。