diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-04-05 23:22:55 +0800 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-04-05 23:22:55 +0800 |
commit | 9eec3178352e9c95a66329182694c8fb0b66642a (patch) | |
tree | f54f8ff3715832893541c6f36d74a90d55b90dc6 | |
parent | 822d9c5b70f3ad8fad1bb23075d29246ff31a38d (diff) | |
download | spark-9eec3178352e9c95a66329182694c8fb0b66642a.tar.gz spark-9eec3178352e9c95a66329182694c8fb0b66642a.tar.bz2 spark-9eec3178352e9c95a66329182694c8fb0b66642a.zip |
Minor cleanup.
-rw-r--r-- | graph/src/main/scala/spark/graph/Analytics.scala | 19 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/Graph.scala | 22 |
2 files changed, 19 insertions, 22 deletions
diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala index d7be886daf..8f2844f34f 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/spark/graph/Analytics.scala @@ -7,7 +7,6 @@ import spark.SparkContext._ object Analytics { - /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ @@ -23,7 +22,6 @@ object Analytics { numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } } - /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ @@ -40,7 +38,6 @@ object Analytics { numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) } } - /** * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD */ @@ -61,8 +58,6 @@ object Analytics { maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) } } - - /** * Compute the connected component membership of each vertex * and return an RDD with the vertex value containing the @@ -79,7 +74,6 @@ object Analytics { gatherDirection = EdgeDirection.Both) } - /** * Compute the shortest path to a set of markers */ @@ -210,8 +204,6 @@ object Analytics { // vertex => vertex.id < maxUser).vertices // } - - def main(args: Array[String]) = { val host = args(0) val taskType = args(1) @@ -223,9 +215,9 @@ object Analytics { } } - // System.setProperty("spark.serializer", "spark.KryoSerializer") - // //System.setProperty("spark.shuffle.compress", "false") - // System.setProperty("spark.kryo.registrator", "spark.graphlab.AnalyticsKryoRegistrator") + System.setProperty("spark.serializer", "spark.KryoSerializer") + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator") taskType match { case "pagerank" => { @@ -268,7 +260,7 @@ object Analytics { // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) // else Analytics.pagerank(graph, numIter) println("Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) ) - if(!outFname.isEmpty) { + if (!outFname.isEmpty) { println("Saving pageranks of pages to " + outFname) pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) } @@ -408,8 +400,5 @@ object Analytics { println("Invalid task type.") } } - } - - } diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 766ef15206..1db92217de 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -311,11 +311,17 @@ object Graph { /** * Load an edge list from file initializing the Graph RDD */ - def textFile[ED: ClassManifest](sc: SparkContext, fname: String, edgeParser: Array[String] => ED) = { + def textFile[ED: ClassManifest]( + sc: SparkContext, + path: String, + edgeParser: Array[String] => ED, + minEdgePartitions: Int = 1, + minVertexPartitions: Int = 1) + : Graph[Int, ED] = { // Parse the edge data table - val edges = sc.textFile(fname).flatMap { line => - if(!line.isEmpty && line(0) != '#') { + val edges = sc.textFile(path).flatMap { line => + if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") if(lineArray.length < 2) { println("Invalid line: " + line) @@ -326,13 +332,15 @@ object Graph { val tail = lineArray.drop(2) val edata = edgeParser(tail) Array(Edge(source.trim.toInt, target.trim.toInt, edata)) - } else { Array.empty[Edge[ED]] } + } else { + Array.empty[Edge[ED]] + } }.cache() val graph = fromEdges(edges) - println("Loaded graph:" + - "\n\t#edges: " + graph.numEdges + - "\n\t#vertices: " + graph.numVertices) + // println("Loaded graph:" + + // "\n\t#edges: " + graph.numEdges + + // "\n\t#vertices: " + graph.numVertices) graph } |