aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-13 18:45:46 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-13 21:02:37 -0800
commitd4d9ece1af258ccdc83afbb8ba26345d7af16422 (patch)
tree118a4a56d99fa81cd966a5a90094c5658f6ae329 /graphx
parentee8931d2c6503716de640d6d1249c515e1fd85d3 (diff)
downloadspark-d4d9ece1af258ccdc83afbb8ba26345d7af16422.tar.gz
spark-d4d9ece1af258ccdc83afbb8ba26345d7af16422.tar.bz2
spark-d4d9ece1af258ccdc83afbb8ba26345d7af16422.zip
Remove Graph.statistics and GraphImpl.printLineage
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala7
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala65
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala6
3 files changed, 1 insertions, 77 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 1e3f3895de..7e99276d25 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -91,11 +91,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
/**
- * Computes statistics describing the graph representation.
- */
- def statistics: Map[String, Any]
-
- /**
* Transforms each vertex attribute in the graph using the map function.
*
* @note The new graph has the same structure. As a consequence the underlying index structures
@@ -254,7 +249,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
/**
- * Computes statistics about the neighboring edges and vertices of each vertex. The user supplied
+ * Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
* `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
* "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
* the map phase destined to each vertex.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 348490c186..12d46a809c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -83,71 +83,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
GraphImpl(vertices, newEdges)
}
- override def statistics: Map[String, Any] = {
- // Get the total number of vertices after replication, used to compute the replication ratio.
- def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = {
- vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
- }
-
- val numVertices = this.ops.numVertices
- val numEdges = this.ops.numEdges
- val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices
- val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices
- val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices
- // One entry for each partition, indicate the total number of edges on that partition.
- val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
- val minLoad = loadArray.min
- val maxLoad = loadArray.max
- Map(
- "Num Vertices" -> numVertices,
- "Num Edges" -> numEdges,
- "Replication (both)" -> replicationRatioBoth,
- "Replication (src only)" -> replicationRatioSrcOnly,
- "Replication (dest only)" -> replicationRatioDstOnly,
- "Load Array" -> loadArray,
- "Min Load" -> minLoad,
- "Max Load" -> maxLoad)
- }
-
- /**
- * Display the lineage information for this graph.
- */
- def printLineage() = {
- def traverseLineage(
- rdd: RDD[_],
- indent: String = "",
- visited: Map[Int, String] = Map.empty[Int, String]) {
- if (visited.contains(rdd.id)) {
- println(indent + visited(rdd.id))
- println(indent)
- } else {
- val locs = rdd.partitions.map( p => rdd.preferredLocations(p) )
- val cacheLevel = rdd.getStorageLevel
- val name = rdd.id
- val deps = rdd.dependencies
- val partitioner = rdd.partitioner
- val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0}
- println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner +
- ", " + numparts +")")
- println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString)
- println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
- deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
- }
- }
- println("edges ------------------------------------------")
- traverseLineage(edges, " ")
- var visited = Map(edges.id -> "edges")
- println("\n\nvertices ------------------------------------------")
- traverseLineage(vertices, " ", visited)
- visited += (vertices.id -> "vertices")
- println("\n\nroutingTable.bothAttrs -------------------------------")
- traverseLineage(routingTable.bothAttrs, " ", visited)
- visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs")
- println("\n\ntriplets ----------------------------------------")
- traverseLineage(triplets, " ", visited)
- println(visited)
- } // end of printLineage
-
override def reverse: Graph[VD, ED] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 08256dccb2..2f4d6d6864 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -73,9 +73,6 @@ object PageRank extends Logging {
.mapVertices( (id, attr) => 1.0 )
.cache()
- // Display statistics about pagerank
- logInfo(pagerankGraph.statistics.toString)
-
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
@@ -121,9 +118,6 @@ object PageRank extends Logging {
.mapVertices( (id, attr) => (0.0, 0.0) )
.cache()
- // Display statistics about pagerank
- logInfo(pagerankGraph.statistics.toString)
-
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {