aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-10 00:34:08 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-10 00:34:08 -0800
commit8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798 (patch)
tree610da135db3784b648a4221a0ff280adbb92adaf
parent2578332f97d489bf0e238b2dbe1a3e1d0258a910 (diff)
downloadspark-8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798.tar.gz
spark-8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798.tar.bz2
spark-8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798.zip
Unpersist previous iterations in GraphLab
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala16
1 files changed, 10 insertions, 6 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
index 437288405f..94cfa7e126 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
@@ -102,6 +102,7 @@ object GraphLab extends Logging {
// Main Loop ---------------------------------------------------------------------
var i = 0
var numActive = activeGraph.numVertices
+ var prevActiveGraph: Graph[(Boolean, VD), ED] = null
while (i < numIter && numActive > 0) {
// Gather
@@ -109,22 +110,25 @@ object GraphLab extends Logging {
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
// Apply
- activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache()
-
-
+ val applied = activeGraph.outerJoinVertices(gathered)(apply)
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
- // activeGraph: Graph[(Boolean, VD), ED]
val scattered: RDD[(VertexID, Boolean)] =
- activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
+ applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
+ prevActiveGraph = activeGraph
activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
- // Calculate the number of active vertices
+ // Calculate the number of active vertices. The call to reduce() materializes the vertices of
+ // `activeGraph`, hiding the vertices of `prevActiveGraph`.
numActive = activeGraph.vertices.map{
case (vid, data) => if (data._1) 1 else 0
}.reduce(_ + _)
logInfo("Number active vertices: " + numActive)
+
+ // Unpersist the RDDs hidden by newly-materialized RDDs
+ prevActiveGraph.unpersistVertices(blocking=false)
+
i += 1
}