aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala5
1 files changed, 5 insertions, 0 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 70a99b33d7..ef0bb2ac13 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -72,6 +72,7 @@ object Bagel extends Logging {
var verts = vertices
var msgs = messages
var noActivity = false
+ var lastRDD: RDD[(K, (V, Array[M]))] = null
do {
logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis
@@ -83,6 +84,10 @@ object Bagel extends Logging {
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
+ if (lastRDD != null) {
+ lastRDD.unpersist(false)
+ }
+ lastRDD = processed
val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))