aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 17:21:13 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 17:21:13 -0800
commitc7fabb745b26b42bb4a4609edcb43019fcbd68aa (patch)
tree900bb0ed4ea2bf44eb3a6db8fb695a63275232fe /streaming/src/test
parent7883b8f5798e3de6f55a1182a5d5775c4aaa783b (diff)
downloadspark-c7fabb745b26b42bb4a4609edcb43019fcbd68aa.tar.gz
spark-c7fabb745b26b42bb4a4609edcb43019fcbd68aa.tar.bz2
spark-c7fabb745b26b42bb4a4609edcb43019fcbd68aa.zip
Changed StreamingContext.stopForWait to awaitTermination.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala2
2 files changed, 9 insertions, 9 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 10c18a7f7e..a477d200c9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -146,7 +146,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc = new StreamingContext(sc, batchDuration)
}
- test("waitForStop") {
+ test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => x).register
@@ -158,13 +158,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
// test whether waitForStop() exits after give amount of time
failAfter(1000 millis) {
- ssc.waitForStop(500)
+ ssc.awaitTermination(500)
}
// test whether waitForStop() does not exit if not time is given
val exception = intercept[Exception] {
failAfter(1000 millis) {
- ssc.waitForStop()
+ ssc.awaitTermination()
throw new Exception("Did not wait for stop")
}
}
@@ -178,11 +178,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc.stop()
}
}.start()
- ssc.waitForStop()
+ ssc.awaitTermination()
}
}
- test("waitForStop with error in task") {
+ test("awaitTermination with error in task") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => { throw new TestException("error in map task"); x})
@@ -190,19 +190,19 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
val exception = intercept[Exception] {
ssc.start()
- ssc.waitForStop(5000)
+ ssc.awaitTermination(5000)
}
assert(exception.getMessage.contains("map task"), "Expected exception not thrown")
}
- test("waitForStop with error in job generation") {
+ test("awaitTermination with error in job generation") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register
val exception = intercept[TestException] {
ssc.start()
- ssc.waitForStop(5000)
+ ssc.awaitTermination(5000)
}
assert(exception.getMessage.contains("transform"), "Expected exception not thrown")
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index a8ff444109..63a07cfbdf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -273,7 +273,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
- ssc.waitForStop(50)
+ ssc.awaitTermination(50)
}
val timeTaken = System.currentTimeMillis() - startTime
logInfo("Output generated in " + timeTaken + " milliseconds")