aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-22 19:35:13 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-22 19:35:13 -0700
commitf3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6 (patch)
tree0ec23ac7f34a5810ed40c5270675eb08be08ecfb /streaming/src/main
parent2de573877fbed20092f1b3af20b603b30ba9a940 (diff)
downloadspark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.tar.gz
spark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.tar.bz2
spark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.zip
[streaming][SPARK-1578] Removed requirement for TTL in StreamingContext.
Since shuffles and RDDs that are out of context are automatically cleaned by Spark core (using ContextCleaner) there is no need for setting the cleaner TTL while creating a StreamingContext. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #491 from tdas/ttl-fix and squashes the following commits: cf01dc7 [Tathagata Das] Removed requirement for TTL in StreamingContext.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala15
1 files changed, 1 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6d9dc87a70..9ba6e02229 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -116,11 +116,6 @@ class StreamingContext private[streaming] (
}
}
- if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
- throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
- + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
- }
-
private[streaming] val conf = sc.conf
private[streaming] val env = SparkEnv.get
@@ -500,8 +495,6 @@ class StreamingContext private[streaming] (
object StreamingContext extends Logging {
- private[streaming] val DEFAULT_CLEANER_TTL = 3600
-
implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
@@ -546,13 +539,7 @@ object StreamingContext extends Logging {
def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls)
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
- // Set the default cleaner delay to an hour if not already set.
- // This should be sufficient for even 1 second batch intervals.
- if (MetadataCleaner.getDelaySeconds(conf) < 0) {
- MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL)
- }
- val sc = new SparkContext(conf)
- sc
+ new SparkContext(conf)
}
private[streaming] def createNewSparkContext(