aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala45
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala1
3 files changed, 13 insertions, 35 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 46b7f63b65..3bad871b5c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -143,7 +143,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
- // TODO: This test makes assumptions about Thread.sleep() and is flaky
+ // TODO: This test works in IntelliJ but not through SBT
ignore("actor input stream") {
// Start the server
val testServer = new TestServer()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 6d14b1f785..3e2b25af84 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -38,15 +38,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val batchDuration = Milliseconds(500)
val sparkHome = "someDir"
val envPair = "key" -> "value"
- val ttl = StreamingContext.DEFAULT_CLEANER_TTL + 100
var sc: SparkContext = null
var ssc: StreamingContext = null
- before {
- System.clearProperty("spark.cleaner.ttl")
- }
-
after {
if (ssc != null) {
ssc.stop()
@@ -62,67 +57,51 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(master, appName, batchDuration)
assert(ssc.sparkContext.conf.get("spark.master") === master)
assert(ssc.sparkContext.conf.get("spark.app.name") === appName)
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
test("from no conf + spark home") {
ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil)
assert(ssc.conf.get("spark.home") === sparkHome)
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
test("from no conf + spark home + env") {
ssc = new StreamingContext(master, appName, batchDuration,
sparkHome, Nil, Map(envPair))
assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
- assert(MetadataCleaner.getDelaySeconds(ssc.sparkContext.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
- }
-
- test("from conf without ttl set") {
- val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- ssc = new StreamingContext(myConf, batchDuration)
- assert(MetadataCleaner.getDelaySeconds(ssc.conf) ===
- StreamingContext.DEFAULT_CLEANER_TTL)
}
- test("from conf with ttl set") {
+ test("from conf with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
- test("from existing SparkContext without ttl set") {
+ test("from existing SparkContext") {
sc = new SparkContext(master, appName)
- val exception = intercept[SparkException] {
- ssc = new StreamingContext(sc, batchDuration)
- }
- assert(exception.getMessage.contains("ttl"))
+ ssc = new StreamingContext(sc, batchDuration)
}
- test("from existing SparkContext with ttl set") {
+ test("from existing SparkContext with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
test("from checkpoint") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", ttl.toString)
+ myConf.set("spark.cleaner.ttl", "10")
val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
- assert(MetadataCleaner.getDelaySeconds(cp.sparkConf) === ttl)
+ assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(MetadataCleaner.getDelaySeconds(newCp.sparkConf) === ttl)
+ assert(newCp.sparkConf.getInt("spark.cleaner.ttl", -1) === 10)
ssc = new StreamingContext(null, newCp, null)
- assert(MetadataCleaner.getDelaySeconds(ssc.conf) === ttl)
+ assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
}
test("start and stop state check") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index aa2d5c2fc2..4f63fd3782 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -137,7 +137,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val conf = new SparkConf()
.setMaster(master)
.setAppName(framework)
- .set("spark.cleaner.ttl", StreamingContext.DEFAULT_CLEANER_TTL.toString)
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )