aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-27 00:57:26 -0700
committerReynold Xin <rxin@apache.org>2014-09-27 00:57:26 -0700
commit436a7730b6e7067f74b3739a3a412490003f7c4c (patch)
treeceb5769202f55f18137ea9d22b1b60d44ff39de4 /streaming
parent2d972fd84ac54a89e416442508a6d4eaeff452c1 (diff)
downloadspark-436a7730b6e7067f74b3739a3a412490003f7c4c.tar.gz
spark-436a7730b6e7067f74b3739a3a412490003f7c4c.tar.bz2
spark-436a7730b6e7067f74b3739a3a412490003f7c4c.zip
Minor cleanup to tighten visibility and remove compilation warning.
Author: Reynold Xin <rxin@apache.org> Closes #2555 from rxin/cleanup and squashes the following commits: 6add199 [Reynold Xin] Minor cleanup to tighten visibility and remove compilation warning.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala47
1 files changed, 24 insertions, 23 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index ebf83748ff..655cec1573 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -19,18 +19,18 @@ package org.apache.spark.streaming
import java.util.concurrent.atomic.AtomicInteger
-import scala.language.postfixOps
+import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.time.SpanSugar._
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
-import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
-import org.scalatest.concurrent.Timeouts
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.exceptions.TestFailedDueToTimeoutException
-import org.scalatest.time.SpanSugar._
+
class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging {
@@ -68,7 +68,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("from no conf + spark home + env") {
ssc = new StreamingContext(master, appName, batchDuration,
sparkHome, Nil, Map(envPair))
- assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
+ assert(ssc.conf.getExecutorEnv.contains(envPair))
}
test("from conf with settings") {
@@ -94,7 +94,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
myConf.set("spark.cleaner.ttl", "10")
val ssc1 = new StreamingContext(myConf, batchDuration)
- addInputStream(ssc1).register
+ addInputStream(ssc1).register()
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
@@ -107,7 +107,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("start and stop state check") {
ssc = new StreamingContext(master, appName, batchDuration)
- addInputStream(ssc).register
+ addInputStream(ssc).register()
assert(ssc.state === ssc.StreamingContextState.Initialized)
ssc.start()
@@ -118,7 +118,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
- addInputStream(ssc).register
+ addInputStream(ssc).register()
ssc.start()
intercept[SparkException] {
ssc.start()
@@ -127,7 +127,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
- addInputStream(ssc).register
+ addInputStream(ssc).register()
ssc.start()
ssc.stop()
ssc.stop()
@@ -135,7 +135,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop before start and start after stop") {
ssc = new StreamingContext(master, appName, batchDuration)
- addInputStream(ssc).register
+ addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception
ssc.start()
ssc.stop()
@@ -147,12 +147,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext
- addInputStream(ssc).register
+ addInputStream(ssc).register()
ssc.start()
- ssc.stop(false)
+ ssc.stop(stopSparkContext = false)
assert(sc.makeRDD(1 to 100).collect().size === 100)
ssc = new StreamingContext(sc, batchDuration)
- addInputStream(ssc).register
+ addInputStream(ssc).register()
ssc.start()
ssc.stop()
}
@@ -167,11 +167,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
var runningCount = 0
TestReceiver.counter.set(1)
val input = ssc.receiverStream(new TestReceiver)
- input.count.foreachRDD(rdd => {
+ input.count().foreachRDD { rdd =>
val count = rdd.first()
runningCount += count.toInt
logInfo("Count = " + count + ", Running count = " + runningCount)
- })
+ }
ssc.start()
ssc.awaitTermination(500)
ssc.stop(stopSparkContext = false, stopGracefully = true)
@@ -191,7 +191,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
- inputStream.map(x => x).register
+ inputStream.map(x => x).register()
// test whether start() blocks indefinitely or not
failAfter(2000 millis) {
@@ -215,7 +215,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
// test whether wait exits if context is stopped
failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
new Thread() {
- override def run {
+ override def run() {
Thread.sleep(500)
ssc.stop()
}
@@ -239,8 +239,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("awaitTermination with error in task") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
- inputStream.map(x => { throw new TestException("error in map task"); x})
- .foreachRDD(_.count)
+ inputStream
+ .map { x => throw new TestException("error in map task"); x }
+ .foreachRDD(_.count())
val exception = intercept[Exception] {
ssc.start()
@@ -252,7 +253,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("awaitTermination with error in job generation") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
- inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register
+ inputStream.transform { rdd => throw new TestException("error in transform"); rdd }.register()
val exception = intercept[TestException] {
ssc.start()
ssc.awaitTermination(5000)
@@ -265,7 +266,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
def addInputStream(s: StreamingContext): DStream[Int] = {
- val input = (1 to 100).map(i => (1 to i))
+ val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)
inputStream
}