aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-04-05 23:22:55 +0800
committerReynold Xin <rxin@cs.berkeley.edu>2013-04-05 23:22:55 +0800
commit9eec3178352e9c95a66329182694c8fb0b66642a (patch)
treef54f8ff3715832893541c6f36d74a90d55b90dc6
parent822d9c5b70f3ad8fad1bb23075d29246ff31a38d (diff)
downloadspark-9eec3178352e9c95a66329182694c8fb0b66642a.tar.gz
spark-9eec3178352e9c95a66329182694c8fb0b66642a.tar.bz2
spark-9eec3178352e9c95a66329182694c8fb0b66642a.zip
Minor cleanup.
-rw-r--r--graph/src/main/scala/spark/graph/Analytics.scala19
-rw-r--r--graph/src/main/scala/spark/graph/Graph.scala22
2 files changed, 19 insertions, 22 deletions
diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala
index d7be886daf..8f2844f34f 100644
--- a/graph/src/main/scala/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/spark/graph/Analytics.scala
@@ -7,7 +7,6 @@ import spark.SparkContext._
object Analytics {
-
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
@@ -23,7 +22,6 @@ object Analytics {
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
}
-
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
@@ -40,7 +38,6 @@ object Analytics {
numIter).mapVertices{ case Vertex(id, (outDeg, r)) => Vertex(id, r) }
}
-
/**
* Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
*/
@@ -61,8 +58,6 @@ object Analytics {
maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
}
-
-
/**
* Compute the connected component membership of each vertex
* and return an RDD with the vertex value containing the
@@ -79,7 +74,6 @@ object Analytics {
gatherDirection = EdgeDirection.Both)
}
-
/**
* Compute the shortest path to a set of markers
*/
@@ -210,8 +204,6 @@ object Analytics {
// vertex => vertex.id < maxUser).vertices
// }
-
-
def main(args: Array[String]) = {
val host = args(0)
val taskType = args(1)
@@ -223,9 +215,9 @@ object Analytics {
}
}
- // System.setProperty("spark.serializer", "spark.KryoSerializer")
- // //System.setProperty("spark.shuffle.compress", "false")
- // System.setProperty("spark.kryo.registrator", "spark.graphlab.AnalyticsKryoRegistrator")
+ System.setProperty("spark.serializer", "spark.KryoSerializer")
+ //System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
taskType match {
case "pagerank" => {
@@ -268,7 +260,7 @@ object Analytics {
// val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
// else Analytics.pagerank(graph, numIter)
println("Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
- if(!outFname.isEmpty) {
+ if (!outFname.isEmpty) {
println("Saving pageranks of pages to " + outFname)
pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
@@ -408,8 +400,5 @@ object Analytics {
println("Invalid task type.")
}
}
-
}
-
-
}
diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala
index 766ef15206..1db92217de 100644
--- a/graph/src/main/scala/spark/graph/Graph.scala
+++ b/graph/src/main/scala/spark/graph/Graph.scala
@@ -311,11 +311,17 @@ object Graph {
/**
* Load an edge list from file initializing the Graph RDD
*/
- def textFile[ED: ClassManifest](sc: SparkContext, fname: String, edgeParser: Array[String] => ED) = {
+ def textFile[ED: ClassManifest](
+ sc: SparkContext,
+ path: String,
+ edgeParser: Array[String] => ED,
+ minEdgePartitions: Int = 1,
+ minVertexPartitions: Int = 1)
+ : Graph[Int, ED] = {
// Parse the edge data table
- val edges = sc.textFile(fname).flatMap { line =>
- if(!line.isEmpty && line(0) != '#') {
+ val edges = sc.textFile(path).flatMap { line =>
+ if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if(lineArray.length < 2) {
println("Invalid line: " + line)
@@ -326,13 +332,15 @@ object Graph {
val tail = lineArray.drop(2)
val edata = edgeParser(tail)
Array(Edge(source.trim.toInt, target.trim.toInt, edata))
- } else { Array.empty[Edge[ED]] }
+ } else {
+ Array.empty[Edge[ED]]
+ }
}.cache()
val graph = fromEdges(edges)
- println("Loaded graph:" +
- "\n\t#edges: " + graph.numEdges +
- "\n\t#vertices: " + graph.numVertices)
+ // println("Loaded graph:" +
+ // "\n\t#edges: " + graph.numEdges +
+ // "\n\t#vertices: " + graph.numVertices)
graph
}