aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala45
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala1
4 files changed, 14 insertions, 49 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6d9dc87a70..9ba6e02229 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -116,11 +116,6 @@ class StreamingContext private[streaming] (
}
}
- if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
- throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
- + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
- }
-
private[streaming] val conf = sc.conf
private[streaming] val env = SparkEnv.get
@@ -500,8 +495,6 @@ class StreamingContext private[streaming] (
object StreamingContext extends Logging {
- private[streaming] val DEFAULT_CLEANER_TTL = 3600
-
implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
@@ -546,13 +539,7 @@ object StreamingContext extends Logging {
def jarOfClass(cls: Class[_]): Option[String] = SparkContext.jarOfClass(cls)
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
- // Set the default cleaner delay to an hour if not already set.
- // This should be sufficient for even 1 second batch intervals.
- if (MetadataCleaner.getDelaySeconds(conf) < 0) {
- MetadataCleaner.setDelaySeconds(conf, DEFAULT_CLEANER_TTL)
- }
- val sc = new SparkContext(conf)
- sc
+ new SparkContext(conf)
}
private[streaming] def createNewSparkContext(
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 46b7f63b65..3bad871b5c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -143,7 +143,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
- // TODO: This test makes assumptions about Thread.sleep() and is flaky
+ // TODO: This test works in IntelliJ but not through SBT
ignore("actor input stream") {
// Start the server
val testServer = new TestServer()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 6d14b1f785..3e2b25af84 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -38,15 +38,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
val envPair = "key" -> "value"
- val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100
var sc: SparkContext = null
var ssc: StreamingContext = null
- before {
- System.clearProperty("spark.cleaner.ttl")
- }
-
after {
if (ssc != null) {
ssc.stop()
@@ -62,67 +57,51 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(master, appName, batchDuration)
assert(ssc.sparkContext.conf.get("spark.master") === master)
assert(ssc.sparkContext.conf.get("spark.app.name") === appName)
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
test("from no conf + spark home") {
ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil)
assert(ssc.conf.get("spark.home") === sparkHome)
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
test("from no conf + spark home + env") {
ssc = new StreamingContext(master, appName, batchDuration,
sparkHome, Nil, Map(envPair))
assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
- }
-
- test("from conf without ttl set") {
- val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- ssc = new StreamingContext(myConf, batchDuration)
- assert(MetadataCleaner.getDelaySeconds(ssc.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
- test("from conf with ttl set") {
+ test("from conf with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
- test("from existing SparkContext without ttl set") {
+ test("from existing SparkContext") {
sc = new SparkContext(master, appName)
- val exception = intercept[SparkException] {
- ssc = new StreamingContext(sc, batchDuration)
- }
- assert(exception.getMessage.contains("ttl"))
+ ssc = new StreamingContext(sc, batchDuration)
}
- test("from existing SparkContext with ttl set") {
+ test("from existing SparkContext with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
test("from checkpoint") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
- assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl)
+ assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl)
+ assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
ssc = new StreamingContext(null, newCp, null)
- assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
test("start and stop state check") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index aa2d5c2fc2..4f63fd3782 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,7 +137,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val conf = new SparkConf()
.setMaster(master)
.setAppName(framework)
- .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString)
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )