From b4dba55f78b0dfda728cf69c9c17e4863010d28d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 2 Dec 2012 02:03:05 +0000 Subject: Made RDD checkpoint not create a new thread. Fixed bug in detecting when spark.cleaner.delay is insufficient. --- core/src/main/scala/spark/RDD.scala | 31 +++++++--------------- .../main/scala/spark/util/TimeStampedHashMap.scala | 3 ++- .../src/main/scala/spark/streaming/DStream.scala | 9 ++++--- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8af6c9bd6a..fbfcfbd704 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -211,28 +211,17 @@ abstract class RDD[T: ClassManifest]( if (startCheckpoint) { val rdd = this - val env = SparkEnv.get - - // Spawn a new thread to do the checkpoint as it takes sometime to write the RDD to file - val th = new Thread() { - override def run() { - // Save the RDD to a file, create a new HadoopRDD from it, - // and change the dependencies from the original parents to the new RDD - SparkEnv.set(env) - rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString - rdd.saveAsObjectFile(checkpointFile) - rdd.synchronized { - rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size) - rdd.checkpointRDDSplits = rdd.checkpointRDD.splits - rdd.changeDependencies(rdd.checkpointRDD) - rdd.shouldCheckpoint = false - rdd.isCheckpointInProgress = false - rdd.isCheckpointed = true - println("Done checkpointing RDD " + rdd.id + ", " + rdd) - } - } + rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString + rdd.saveAsObjectFile(checkpointFile) + rdd.synchronized { + rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size) + rdd.checkpointRDDSplits = rdd.checkpointRDD.splits + rdd.changeDependencies(rdd.checkpointRDD) + rdd.shouldCheckpoint = false + rdd.isCheckpointInProgress = false + rdd.isCheckpointed = true + println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD) } - th.start() } else { // Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked dependencies.foreach(_.rdd.doCheckpoint()) diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala index 9bcc9245c0..52f03784db 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap * threshold time can them be removed using the cleanup method. This is intended to be a drop-in * replacement of scala.collection.mutable.HashMap. */ -class TimeStampedHashMap[A, B] extends Map[A, B]() { +class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging { val internalMap = new ConcurrentHashMap[A, (B, Long)]() def get(key: A): Option[B] = { @@ -79,6 +79,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { while(iterator.hasNext) { val entry = iterator.next() if (entry.getValue._2 < threshTime) { + logDebug("Removing key " + entry.getKey) iterator.remove() } } diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 28a3e2dfc7..d2e9de110e 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -182,14 +182,15 @@ extends Serializable with Logging { checkpointInterval + "). Please set it to higher than " + checkpointInterval + "." ) - val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble + val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds + logInfo("metadataCleanupDelay = " + metadataCleanerDelay) assert( - metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000, + metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000, "It seems you are doing some DStream window operation or setting a checkpoint interval " + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + - "delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " + - "the Java property 'spark.cleanup.delay' to more than " + + "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " + + "the Java property 'spark.cleaner.delay' to more than " + math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes." ) -- cgit v1.2.3