From 729277ebc41f77103ffbfca55942d85bc5ac4800 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 10 Jan 2014 01:53:28 -0800 Subject: Undo 8b6b8ac87f6ffb92b3395344bf2696d5c7fb3798 Getting unpersist right in GraphLab is tricky. --- graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala index 94cfa7e126..7efc69c64e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala @@ -110,25 +110,21 @@ object GraphLab extends Logging { activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) // Apply - val applied = activeGraph.outerJoinVertices(gathered)(apply) + val applied = activeGraph.outerJoinVertices(gathered)(apply).cache() // Scatter is basically a gather in the opposite direction so we reverse the edge direction val scattered: RDD[(VertexID, Boolean)] = applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) prevActiveGraph = activeGraph - activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() + activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache() - // Calculate the number of active vertices. The call to reduce() materializes the vertices of - // `activeGraph`, hiding the vertices of `prevActiveGraph`. + // Calculate the number of active vertices. numActive = activeGraph.vertices.map{ case (vid, data) => if (data._1) 1 else 0 }.reduce(_ + _) logInfo("Number active vertices: " + numActive) - // Unpersist the RDDs hidden by newly-materialized RDDs - prevActiveGraph.unpersistVertices(blocking=false) - i += 1 } -- cgit v1.2.3