diff options
author | Ankur Dave <ankurdave@gmail.com> | 2013-12-12 15:43:12 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2013-12-12 15:43:12 -0800 |
commit | 3f69cdc81b21f59024477b74571cfb683a0d3ca6 (patch) | |
tree | d3f3224068e4f41aef0416010dfb97ab817305f2 /graph/src/main | |
parent | a0fb477726f20c2c7eed0eed19008c3642a76da6 (diff) | |
download | spark-3f69cdc81b21f59024477b74571cfb683a0d3ca6.tar.gz spark-3f69cdc81b21f59024477b74571cfb683a0d3ca6.tar.bz2 spark-3f69cdc81b21f59024477b74571cfb683a0d3ca6.zip |
Use standalone PageRank in Analytics
Diffstat (limited to 'graph/src/main')
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/Analytics.scala | 31 |
1 files changed, 7 insertions, 24 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 755809b4b9..ac50e9a388 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -54,8 +54,6 @@ object Analytics extends Logging { taskType match { case "pagerank" => { - var numIter = Int.MaxValue - var isDynamic = false var tol:Float = 0.001F var outFname = "" var numVPart = 4 @@ -63,8 +61,6 @@ object Analytics extends Logging { var partitionStrategy: PartitionStrategy = RandomVertexCut options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean case ("tol", v) => tol = v.toFloat case ("output", v) => outFname = v case ("numVPart", v) => numVPart = v.toInt @@ -73,40 +69,27 @@ object Analytics extends Logging { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } - if(!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } println("======================================") println("| PageRank |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - if(isDynamic) println(" \t |-> Tolerance: " + tol) - println(" \tNumIter: " + numIter) println("======================================") val sc = new SparkContext(host, "PageRank(" + fname + ")") val graph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache() + minEdgePartitions = numEPart, partitionStrategy = partitionStrategy).cache() - val startTime = System.currentTimeMillis - println("GRAPHX: starting tasks") println("GRAPHX: Number of vertices " + graph.vertices.count) println("GRAPHX: Number of edges " + graph.edges.count) //val pr = Analytics.pagerank(graph, numIter) - val pr = if(isDynamic) PageRank.runUntillConvergence(graph, tol, numIter) - else PageRank.run(graph, numIter) - println("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) ) + val pr = PageRank.runStandalone(graph, tol) + + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_)) + if (!outFname.isEmpty) { - println("Saving pageranks of pages to " + outFname) - pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + logWarning("Saving pageranks of pages to " + outFname) + pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) } - println("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") - - Thread.sleep(100000) sc.stop() } |