diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-22 19:35:13 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-22 19:35:13 -0700 |
commit | f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6 (patch) | |
tree | 0ec23ac7f34a5810ed40c5270675eb08be08ecfb /streaming/src/main | |
parent | 2de573877fbed20092f1b3af20b603b30ba9a940 (diff) | |
download | spark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.tar.gz spark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.tar.bz2 spark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.zip |
[streaming][SPARK-1578] Removed requirement for TTL in StreamingContext.
Since shuffles and RDDs that are out of context are automatically cleaned by Spark core (using ContextCleaner) there is no need for setting the cleaner TTL while creating a StreamingContext.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #491 from tdas/ttl-fix and squashes the following commits:
cf01dc7 [Tathagata Das] Removed requirement for TTL in StreamingContext.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 15 |
1 files changed, 1 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 6d9dc87a70..9ba6e02229 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -116,11 +116,6 @@ class StreamingContext private[streaming] ( } } - if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) { - throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " - + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") - } - private[streaming] val conf = sc.conf private[streaming] val env = SparkEnv.get @@ -500,8 +495,6 @@ class StreamingContext private[streaming] ( object StreamingContext extends Logging { - private[streaming] val DEFAULT_CLEANER_TTL = 3600 - implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } @@ -546,13 +539,7 @@ object StreamingContext extends Logging { def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls) private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { - // Set the default cleaner delay to an hour if not already set. - // This should be sufficient for even 1 second batch intervals. - if (MetadataCleaner.getDelaySeconds(conf) < 0) { - MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL) - } - val sc = new SparkContext(conf) - sc + new SparkContext(conf) } private[streaming] def createNewSparkContext( |