aboutsummaryrefslogtreecommitdiff
path: root/graph/src/main
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-12 15:43:12 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-12 15:43:12 -0800
commit3f69cdc81b21f59024477b74571cfb683a0d3ca6 (patch)
treed3f3224068e4f41aef0416010dfb97ab817305f2 /graph/src/main
parenta0fb477726f20c2c7eed0eed19008c3642a76da6 (diff)
downloadspark-3f69cdc81b21f59024477b74571cfb683a0d3ca6.tar.gz
spark-3f69cdc81b21f59024477b74571cfb683a0d3ca6.tar.bz2
spark-3f69cdc81b21f59024477b74571cfb683a0d3ca6.zip
Use standalone PageRank in Analytics
Diffstat (limited to 'graph/src/main')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala31
1 files changed, 7 insertions, 24 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index 755809b4b9..ac50e9a388 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -54,8 +54,6 @@ object Analytics extends Logging {
taskType match {
case "pagerank" => {
- var numIter = Int.MaxValue
- var isDynamic = false
var tol:Float = 0.001F
var outFname = ""
var numVPart = 4
@@ -63,8 +61,6 @@ object Analytics extends Logging {
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
@@ -73,40 +69,27 @@ object Analytics extends Logging {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
- if(!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
println("======================================")
println("| PageRank |")
- println("--------------------------------------")
- println(" Using parameters:")
- println(" \tDynamic: " + isDynamic)
- if(isDynamic) println(" \t |-> Tolerance: " + tol)
- println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
+ minEdgePartitions = numEPart, partitionStrategy = partitionStrategy).cache()
- val startTime = System.currentTimeMillis
- println("GRAPHX: starting tasks")
println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
//val pr = Analytics.pagerank(graph, numIter)
- val pr = if(isDynamic) PageRank.runUntillConvergence(graph, tol, numIter)
- else PageRank.run(graph, numIter)
- println("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
+ val pr = PageRank.runStandalone(graph, tol)
+
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
+
if (!outFname.isEmpty) {
- println("Saving pageranks of pages to " + outFname)
- pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ logWarning("Saving pageranks of pages to " + outFname)
+ pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
- println("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
-
- Thread.sleep(100000)
sc.stop()
}