aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-09 23:34:35 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-09 23:34:35 -0800
commit2578332f97d489bf0e238b2dbe1a3e1d0258a910 (patch)
treea3f3ffcce656f6eb95d1265ed8cf1059868104d5
parent8ae108f6c48528f3bb7498d586eb51a70c043764 (diff)
downloadspark-2578332f97d489bf0e238b2dbe1a3e1d0258a910.tar.gz
spark-2578332f97d489bf0e238b2dbe1a3e1d0258a910.tar.bz2
spark-2578332f97d489bf0e238b2dbe1a3e1d0258a910.zip
Add Graph.unpersistVertices()
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala6
3 files changed, 18 insertions, 8 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 2b7c0a2583..6f2d19d0da 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -94,6 +94,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def cache(): Graph[VD, ED]
/**
+ * Uncache only the vertices of this graph, leaving the edges alone. This is useful because most
+ * graph operations modify the vertices but reuse the edges.
+ */
+ def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
+
+ /**
* Repartition the edges in the graph according to partitionStrategy.
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index ed8733a806..0af230ed29 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -106,23 +106,21 @@ object Pregel {
// Update the graph with the new vertices.
prevG = g
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
- g.vertices.cache()
+ g.cache()
val oldMessages = messages
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
// get to send messages. We must cache messages so it can be materialized on the next line,
// allowing us to uncache the previous iteration.
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
- // Materializes messages, newVerts, and g.rvv (which materializes g.vertices). Hides
- // oldMessages (depended on by newVerts), newVerts (depended on by messages), prevG.vertices
- // (depended on by newVerts and g.vertices), and prevG.rvv (depended on by oldMessages and
- // g.rvv).
+ // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
+ // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
+ // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
activeMessages = messages.count()
- // Unpersist hidden RDDs
+ // Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
- prevG.vertices.unpersist(blocking=false)
- prevG.asInstanceOf[org.apache.spark.graphx.impl.GraphImpl[VD, ED]].replicatedVertexView.unpersist(blocking=false)
+ prevG.unpersistVertices(blocking=false)
// count the iteration
i += 1
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index be9f188150..2dd1324d4f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -65,6 +65,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+ override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
+ vertices.unpersist(blocking)
+ replicatedVertexView.unpersist(blocking)
+ this
+ }
+
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
val numPartitions = edges.partitions.size
val edTag = classTag[ED]