aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-15 21:25:03 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-15 21:25:03 -0700
commitb5346064d6a2b6858e77b718d2fbadd691374282 (patch)
tree28da728d92d8e7dea96ec7c2587d4a566fe92324
parent6dbd2208ffe1df0b2a871f7f18b08bd825f61001 (diff)
parentfbe40c5806a01e38c56b12a09bc4b84681a99602 (diff)
downloadspark-b5346064d6a2b6858e77b718d2fbadd691374282.tar.gz
spark-b5346064d6a2b6858e77b718d2fbadd691374282.tar.bz2
spark-b5346064d6a2b6858e77b718d2fbadd691374282.zip
Merge pull request #8 from vchekan/checkpoint-ttl-restore
Serialize and restore spark.cleaner.ttl to savepoint In accordance to conversation in spark-dev maillist, preserve spark.cleaner.ttl parameter when serializing checkpoint.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
2 files changed, 6 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 2d8f072624..bb9febad38 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.MetadataCleaner
private[streaming]
@@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
+ val delaySeconds = MetadataCleaner.getDelaySeconds
def validate() {
assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705..098081d245 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -100,6 +100,10 @@ class StreamingContext private (
"both SparkContext and checkpoint as null")
}
+ if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) {
+ MetadataCleaner.setDelaySeconds(cp_.delaySeconds)
+ }
+
if (MetadataCleaner.getDelaySeconds < 0) {
throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+ "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")