aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-10 01:53:28 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-10 01:53:28 -0800
commit729277ebc41f77103ffbfca55942d85bc5ac4800 (patch)
tree3283e6fdd205d3d400442414a946adabdf689d39
parent4cc550909a693ac9954a9d7327347353d9513049 (diff)
downloadspark-729277ebc41f77103ffbfca55942d85bc5ac4800.tar.gz
spark-729277ebc41f77103ffbfca55942d85bc5ac4800.tar.bz2
spark-729277ebc41f77103ffbfca55942d85bc5ac4800.zip
Undo 8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798
Getting unpersist right in GraphLab is tricky.
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala10
1 files changed, 3 insertions, 7 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
index 94cfa7e126..7efc69c64e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
@@ -110,25 +110,21 @@ object GraphLab extends Logging {
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
// Apply
- val applied = activeGraph.outerJoinVertices(gathered)(apply)
+ val applied = activeGraph.outerJoinVertices(gathered)(apply).cache()
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
val scattered: RDD[(VertexID, Boolean)] =
applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
prevActiveGraph = activeGraph
- activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
+ activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache()
- // Calculate the number of active vertices. The call to reduce() materializes the vertices of
- // `activeGraph`, hiding the vertices of `prevActiveGraph`.
+ // Calculate the number of active vertices.
numActive = activeGraph.vertices.map{
case (vid, data) => if (data._1) 1 else 0
}.reduce(_ + _)
logInfo("Number active vertices: " + numActive)
- // Unpersist the RDDs hidden by newly-materialized RDDs
- prevActiveGraph.unpersistVertices(blocking=false)
-
i += 1
}