diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index ebf83748ff..655cec1573 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -19,18 +19,18 @@ package org.apache.spark.streaming import java.util.concurrent.atomic.AtomicInteger -import scala.language.postfixOps +import org.scalatest.{Assertions, BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Timeouts +import org.scalatest.concurrent.Eventually._ +import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalatest.time.SpanSugar._ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.scalatest.{Assertions, BeforeAndAfter, FunSuite} -import org.scalatest.concurrent.Timeouts -import org.scalatest.concurrent.Eventually._ -import org.scalatest.exceptions.TestFailedDueToTimeoutException -import org.scalatest.time.SpanSugar._ + class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging { @@ -68,7 +68,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("from no conf + spark home + env") { ssc = new StreamingContext(master, appName, batchDuration, sparkHome, Nil, Map(envPair)) - assert(ssc.conf.getExecutorEnv.exists(_ == envPair)) + assert(ssc.conf.getExecutorEnv.contains(envPair)) } test("from conf with settings") { @@ -94,7 +94,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) myConf.set("spark.cleaner.ttl", "10") val ssc1 = new StreamingContext(myConf, batchDuration) - addInputStream(ssc1).register + addInputStream(ssc1).register() ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") @@ -107,7 +107,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("start and stop state check") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() assert(ssc.state === ssc.StreamingContextState.Initialized) ssc.start() @@ -118,7 +118,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("start multiple times") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() intercept[SparkException] { ssc.start() @@ -127,7 +127,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop multiple times") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() ssc.stop() ssc.stop() @@ -135,7 +135,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop before start and start after stop") { ssc = new StreamingContext(master, appName, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.stop() // stop before start should not throw exception ssc.start() ssc.stop() @@ -147,12 +147,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) sc = ssc.sparkContext - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() - ssc.stop(false) + ssc.stop(stopSparkContext = false) assert(sc.makeRDD(1 to 100).collect().size === 100) ssc = new StreamingContext(sc, batchDuration) - addInputStream(ssc).register + addInputStream(ssc).register() ssc.start() ssc.stop() } @@ -167,11 +167,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w var runningCount = 0 TestReceiver.counter.set(1) val input = ssc.receiverStream(new TestReceiver) - input.count.foreachRDD(rdd => { + input.count().foreachRDD { rdd => val count = rdd.first() runningCount += count.toInt logInfo("Count = " + count + ", Running count = " + runningCount) - }) + } ssc.start() ssc.awaitTermination(500) ssc.stop(stopSparkContext = false, stopGracefully = true) @@ -191,7 +191,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) - inputStream.map(x => x).register + inputStream.map(x => x).register() // test whether start() blocks indefinitely or not failAfter(2000 millis) { @@ -215,7 +215,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w // test whether wait exits if context is stopped failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown new Thread() { - override def run { + override def run() { Thread.sleep(500) ssc.stop() } @@ -239,8 +239,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("awaitTermination with error in task") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) - inputStream.map(x => { throw new TestException("error in map task"); x}) - .foreachRDD(_.count) + inputStream + .map { x => throw new TestException("error in map task"); x } + .foreachRDD(_.count()) val exception = intercept[Exception] { ssc.start() @@ -252,7 +253,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("awaitTermination with error in job generation") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) - inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register + inputStream.transform { rdd => throw new TestException("error in transform"); rdd }.register() val exception = intercept[TestException] { ssc.start() ssc.awaitTermination(5000) @@ -265,7 +266,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } def addInputStream(s: StreamingContext): DStream[Int] = { - val input = (1 to 100).map(i => (1 to i)) + val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) inputStream } |