aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-22 19:35:13 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-22 19:35:13 -0700
commitf3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6 (patch)
tree0ec23ac7f34a5810ed40c5270675eb08be08ecfb /streaming
parent2de573877fbed20092f1b3af20b603b30ba9a940 (diff)
downloadspark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.tar.gz
spark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.tar.bz2
spark-f3d19a9f1a4e38ff9fb5bf78e04ed5d321219bf6.zip
[streaming][SPARK-1578] Removed requirement for TTL in StreamingContext.
Since shuffles and RDDs that are out of context are automatically cleaned by Spark core (using ContextCleaner) there is no need for setting the cleaner TTL while creating a StreamingContext. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #491 from tdas/ttl-fix and squashes the following commits: cf01dc7 [Tathagata Das] Removed requirement for TTL in StreamingContext.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala45
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala1
4 files changed, 14 insertions, 49 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6d9dc87a70..9ba6e02229 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -116,11 +116,6 @@ class StreamingContext private[streaming] (
}
}
- if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
- throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
- + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
- }
-
private[streaming] val conf = sc.conf
private[streaming] val env = SparkEnv.get
@@ -500,8 +495,6 @@ class StreamingContext private[streaming] (
object StreamingContext extends Logging {
- private[streaming] val DEFAULT_CLEANER_TTL = 3600
-
implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
@@ -546,13 +539,7 @@ object StreamingContext extends Logging {
def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls)
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
- // Set the default cleaner delay to an hour if not already set.
- // This should be sufficient for even 1 second batch intervals.
- if (MetadataCleaner.getDelaySeconds(conf) < 0) {
- MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL)
- }
- val sc = new SparkContext(conf)
- sc
+ new SparkContext(conf)
}
private[streaming] def createNewSparkContext(
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 46b7f63b65..3bad871b5c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -143,7 +143,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
- // TODO: This test makes assumptions about Thread.sleep() and is flaky
+ // TODO: This test works in IntelliJ but not through SBT
ignore("actor input stream") {
// Start the server
val testServer = new TestServer()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 6d14b1f785..3e2b25af84 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -38,15 +38,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
val envPair = "key" -> "value"
- val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100
var sc: SparkContext = null
var ssc: StreamingContext = null
- before {
- System.clearProperty("spark.cleaner.ttl")
- }
-
after {
if (ssc != null) {
ssc.stop()
@@ -62,67 +57,51 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(master, appName, batchDuration)
assert(ssc.sparkContext.conf.get("spark.master") === master)
assert(ssc.sparkContext.conf.get("spark.app.name") === appName)
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
test("from no conf + spark home") {
ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil)
assert(ssc.conf.get("spark.home") === sparkHome)
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
test("from no conf + spark home + env") {
ssc = new StreamingContext(master, appName, batchDuration,
sparkHome, Nil, Map(envPair))
assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
- }
-
- test("from conf without ttl set") {
- val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- ssc = new StreamingContext(myConf, batchDuration)
- assert(MetadataCleaner.getDelaySeconds(ssc.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
- test("from conf with ttl set") {
+ test("from conf with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
- test("from existing SparkContext without ttl set") {
+ test("from existing SparkContext") {
sc = new SparkContext(master, appName)
- val exception = intercept[SparkException] {
- ssc = new StreamingContext(sc, batchDuration)
- }
- assert(exception.getMessage.contains("ttl"))
+ ssc = new StreamingContext(sc, batchDuration)
}
- test("from existing SparkContext with ttl set") {
+ test("from existing SparkContext with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
test("from checkpoint") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
- assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl)
+ assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl)
+ assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
ssc = new StreamingContext(null, newCp, null)
- assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
test("start and stop state check") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index aa2d5c2fc2..4f63fd3782 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,7 +137,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val conf = new SparkConf()
.setMaster(master)
.setAppName(framework)
- .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString)
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )