diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-09 23:34:35 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-09 23:34:35 -0800 |
commit | 2578332f97d489bf0e238b2dbe1a3e1d0258a910 (patch) | |
tree | a3f3ffcce656f6eb95d1265ed8cf1059868104d5 | |
parent | 8ae108f6c48528f3bb7498d586eb51a70c043764 (diff) | |
download | spark-2578332f97d489bf0e238b2dbe1a3e1d0258a910.tar.gz spark-2578332f97d489bf0e238b2dbe1a3e1d0258a910.tar.bz2 spark-2578332f97d489bf0e238b2dbe1a3e1d0258a910.zip |
Add Graph.unpersistVertices()
3 files changed, 18 insertions, 8 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 2b7c0a2583..6f2d19d0da 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -94,6 +94,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { def cache(): Graph[VD, ED] /** + * Uncache only the vertices of this graph, leaving the edges alone. This is useful because most + * graph operations modify the vertices but reuse the edges. + */ + def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] + + /** * Repartition the edges in the graph according to partitionStrategy. */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index ed8733a806..0af230ed29 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -106,23 +106,21 @@ object Pregel { // Update the graph with the new vertices. prevG = g g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) } - g.vertices.cache() + g.cache() val oldMessages = messages // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't // get to send messages. We must cache messages so it can be materialized on the next line, // allowing us to uncache the previous iteration. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache() - // Materializes messages, newVerts, and g.rvv (which materializes g.vertices). Hides - // oldMessages (depended on by newVerts), newVerts (depended on by messages), prevG.vertices - // (depended on by newVerts and g.vertices), and prevG.rvv (depended on by oldMessages and - // g.rvv). + // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This + // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the + // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). activeMessages = messages.count() - // Unpersist hidden RDDs + // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking=false) newVerts.unpersist(blocking=false) - prevG.vertices.unpersist(blocking=false) - prevG.asInstanceOf[org.apache.spark.graphx.impl.GraphImpl[VD, ED]].replicatedVertexView.unpersist(blocking=false) + prevG.unpersistVertices(blocking=false) // count the iteration i += 1 } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index be9f188150..2dd1324d4f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -65,6 +65,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY) + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { + vertices.unpersist(blocking) + replicatedVertexView.unpersist(blocking) + this + } + override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { val numPartitions = edges.partitions.size val edTag = classTag[ED] |