aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 17:36:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 17:36:49 -0800
commitd1820fef574e8f559d8fba3995e21216033be303 (patch)
treebaeab5a0e06cce68da9927940401d642e6013115 /streaming/src/test
parent448aef6790caa3728bcc43f518afb69807597c39 (diff)
parentc7fabb745b26b42bb4a4609edcb43019fcbd68aa (diff)
downloadspark-d1820fef574e8f559d8fba3995e21216033be303.tar.gz
spark-d1820fef574e8f559d8fba3995e21216033be303.tar.bz2
spark-d1820fef574e8f559d8fba3995e21216033be303.zip
Merge branch 'error-handling' into dstream-move
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala26
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala5
2 files changed, 21 insertions, 10 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index e0232c70a8..a4d0f9f978 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc.stop()
ssc = null
}
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
}
test("from no conf constructor") {
@@ -125,6 +129,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register
+ ssc.start()
ssc.stop()
ssc.stop()
ssc = null
@@ -132,12 +138,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
+ sc = ssc.sparkContext
+ addInputStream(ssc).register
+ ssc.start()
ssc.stop(false)
ssc = null
assert(sc.makeRDD(1 to 100).collect().size === 100)
+ ssc = new StreamingContext(sc, batchDuration)
}
- test("waitForStop") {
+ test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => x).register
@@ -149,13 +159,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
// test whether waitForStop() exits after give amount of time
failAfter(1000 millis) {
- ssc.waitForStop(500)
+ ssc.awaitTermination(500)
}
// test whether waitForStop() does not exit if not time is given
val exception = intercept[Exception] {
failAfter(1000 millis) {
- ssc.waitForStop()
+ ssc.awaitTermination()
throw new Exception("Did not wait for stop")
}
}
@@ -169,11 +179,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc.stop()
}
}.start()
- ssc.waitForStop()
+ ssc.awaitTermination()
}
}
- test("waitForStop with error in task") {
+ test("awaitTermination with error in task") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => { throw new TestException("error in map task"); x})
@@ -181,19 +191,19 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
val exception = intercept[Exception] {
ssc.start()
- ssc.waitForStop(5000)
+ ssc.awaitTermination(5000)
}
assert(exception.getMessage.contains("map task"), "Expected exception not thrown")
}
- test("waitForStop with error in job generation") {
+ test("awaitTermination with error in job generation") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register
val exception = intercept[TestException] {
ssc.start()
- ssc.waitForStop(5000)
+ ssc.awaitTermination(5000)
}
assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 75093d6106..d56dd1abe3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
- Thread.sleep(10)
+ ssc.awaitTermination(50)
}
val timeTaken = System.currentTimeMillis() - startTime
-
+ logInfo("Output generated in " + timeTaken + " milliseconds")
+ output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")