diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-10 01:53:28 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-10 01:53:28 -0800 |
commit | 729277ebc41f77103ffbfca55942d85bc5ac4800 (patch) | |
tree | 3283e6fdd205d3d400442414a946adabdf689d39 | |
parent | 4cc550909a693ac9954a9d7327347353d9513049 (diff) | |
download | spark-729277ebc41f77103ffbfca55942d85bc5ac4800.tar.gz spark-729277ebc41f77103ffbfca55942d85bc5ac4800.tar.bz2 spark-729277ebc41f77103ffbfca55942d85bc5ac4800.zip |
Undo 8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798
Getting unpersist right in GraphLab is tricky.
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala | 10 |
1 files changed, 3 insertions, 7 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala index 94cfa7e126..7efc69c64e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala @@ -110,25 +110,21 @@ object GraphLab extends Logging { activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) // Apply - val applied = activeGraph.outerJoinVertices(gathered)(apply) + val applied = activeGraph.outerJoinVertices(gathered)(apply).cache() // Scatter is basically a gather in the opposite direction so we reverse the edge direction val scattered: RDD[(VertexID, Boolean)] = applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) prevActiveGraph = activeGraph - activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() + activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache() - // Calculate the number of active vertices. The call to reduce() materializes the vertices of - // `activeGraph`, hiding the vertices of `prevActiveGraph`. + // Calculate the number of active vertices. numActive = activeGraph.vertices.map{ case (vid, data) => if (data._1) 1 else 0 }.reduce(_ + _) logInfo("Number active vertices: " + numActive) - // Unpersist the RDDs hidden by newly-materialized RDDs - prevActiveGraph.unpersistVertices(blocking=false) - i += 1 } |