aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/RDD.scala31
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala9
3 files changed, 17 insertions, 26 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 8af6c9bd6a..fbfcfbd704 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -211,28 +211,17 @@ abstract class RDD[T: ClassManifest](
if (startCheckpoint) {
val rdd = this
- val env = SparkEnv.get
-
- // Spawn a new thread to do the checkpoint as it takes sometime to write the RDD to file
- val th = new Thread() {
- override def run() {
- // Save the RDD to a file, create a new HadoopRDD from it,
- // and change the dependencies from the original parents to the new RDD
- SparkEnv.set(env)
- rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString
- rdd.saveAsObjectFile(checkpointFile)
- rdd.synchronized {
- rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size)
- rdd.checkpointRDDSplits = rdd.checkpointRDD.splits
- rdd.changeDependencies(rdd.checkpointRDD)
- rdd.shouldCheckpoint = false
- rdd.isCheckpointInProgress = false
- rdd.isCheckpointed = true
- println("Done checkpointing RDD " + rdd.id + ", " + rdd)
- }
- }
+ rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString
+ rdd.saveAsObjectFile(checkpointFile)
+ rdd.synchronized {
+ rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size)
+ rdd.checkpointRDDSplits = rdd.checkpointRDD.splits
+ rdd.changeDependencies(rdd.checkpointRDD)
+ rdd.shouldCheckpoint = false
+ rdd.isCheckpointInProgress = false
+ rdd.isCheckpointed = true
+ println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD)
}
- th.start()
} else {
// Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked
dependencies.foreach(_.rdd.doCheckpoint())
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index 9bcc9245c0..52f03784db 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap
* threshold time can them be removed using the cleanup method. This is intended to be a drop-in
* replacement of scala.collection.mutable.HashMap.
*/
-class TimeStampedHashMap[A, B] extends Map[A, B]() {
+class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
val internalMap = new ConcurrentHashMap[A, (B, Long)]()
def get(key: A): Option[B] = {
@@ -79,6 +79,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() {
while(iterator.hasNext) {
val entry = iterator.next()
if (entry.getValue._2 < threshTime) {
+ logDebug("Removing key " + entry.getKey)
iterator.remove()
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 28a3e2dfc7..d2e9de110e 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -182,14 +182,15 @@ extends Serializable with Logging {
checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
)
- val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble
+ val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
+ logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
- metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000,
+ metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
- "delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " +
- "the Java property 'spark.cleanup.delay' to more than " +
+ "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
+ "the Java property 'spark.cleaner.delay' to more than " +
math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
)