diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 18:45:46 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-13 21:02:37 -0800 |
commit | d4d9ece1af258ccdc83afbb8ba26345d7af16422 (patch) | |
tree | 118a4a56d99fa81cd966a5a90094c5658f6ae329 /graphx/src | |
parent | ee8931d2c6503716de640d6d1249c515e1fd85d3 (diff) | |
download | spark-d4d9ece1af258ccdc83afbb8ba26345d7af16422.tar.gz spark-d4d9ece1af258ccdc83afbb8ba26345d7af16422.tar.bz2 spark-d4d9ece1af258ccdc83afbb8ba26345d7af16422.zip |
Remove Graph.statistics and GraphImpl.printLineage
Diffstat (limited to 'graphx/src')
3 files changed, 1 insertions, 77 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 1e3f3895de..7e99276d25 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -91,11 +91,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] /** - * Computes statistics describing the graph representation. - */ - def statistics: Map[String, Any] - - /** * Transforms each vertex attribute in the graph using the map function. * * @note The new graph has the same structure. As a consequence the underlying index structures @@ -254,7 +249,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] /** - * Computes statistics about the neighboring edges and vertices of each vertex. The user supplied + * Aggregates values from the neighboring edges and vertices of each vertex. The user supplied * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be * "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of * the map phase destined to each vertex. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 348490c186..12d46a809c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -83,71 +83,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( GraphImpl(vertices, newEdges) } - override def statistics: Map[String, Any] = { - // Get the total number of vertices after replication, used to compute the replication ratio. - def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = { - vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble - } - - val numVertices = this.ops.numVertices - val numEdges = this.ops.numEdges - val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices - val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices - val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices - // One entry for each partition, indicate the total number of edges on that partition. - val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges) - val minLoad = loadArray.min - val maxLoad = loadArray.max - Map( - "Num Vertices" -> numVertices, - "Num Edges" -> numEdges, - "Replication (both)" -> replicationRatioBoth, - "Replication (src only)" -> replicationRatioSrcOnly, - "Replication (dest only)" -> replicationRatioDstOnly, - "Load Array" -> loadArray, - "Min Load" -> minLoad, - "Max Load" -> maxLoad) - } - - /** - * Display the lineage information for this graph. - */ - def printLineage() = { - def traverseLineage( - rdd: RDD[_], - indent: String = "", - visited: Map[Int, String] = Map.empty[Int, String]) { - if (visited.contains(rdd.id)) { - println(indent + visited(rdd.id)) - println(indent) - } else { - val locs = rdd.partitions.map( p => rdd.preferredLocations(p) ) - val cacheLevel = rdd.getStorageLevel - val name = rdd.id - val deps = rdd.dependencies - val partitioner = rdd.partitioner - val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0} - println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + - ", " + numparts +")") - println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString) - println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", ")) - deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited)) - } - } - println("edges ------------------------------------------") - traverseLineage(edges, " ") - var visited = Map(edges.id -> "edges") - println("\n\nvertices ------------------------------------------") - traverseLineage(vertices, " ", visited) - visited += (vertices.id -> "vertices") - println("\n\nroutingTable.bothAttrs -------------------------------") - traverseLineage(routingTable.bothAttrs, " ", visited) - visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs") - println("\n\ntriplets ----------------------------------------") - traverseLineage(triplets, " ", visited) - println(visited) - } // end of printLineage - override def reverse: Graph[VD, ED] = { val newETable = edges.mapEdgePartitions((pid, part) => part.reverse) new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 08256dccb2..2f4d6d6864 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -73,9 +73,6 @@ object PageRank extends Logging { .mapVertices( (id, attr) => 1.0 ) .cache() - // Display statistics about pagerank - logInfo(pagerankGraph.statistics.toString) - // Define the three functions needed to implement PageRank in the GraphX // version of Pregel def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = @@ -121,9 +118,6 @@ object PageRank extends Logging { .mapVertices( (id, attr) => (0.0, 0.0) ) .cache() - // Display statistics about pagerank - logInfo(pagerankGraph.statistics.toString) - // Define the three functions needed to implement PageRank in the GraphX // version of Pregel def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { |