aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-08 00:24:59 -0700
committerReynold Xin <rxin@databricks.com>2015-04-08 00:24:59 -0700
commit15e0d2bd1304db62fad286c1bb687e87c361e16c (patch)
treebcdc2f0018de4cbd492c815428fadb671fb74bcf
parent8d2a36c0fdfbea9f58271ef6aeb89bb79b22cf62 (diff)
downloadspark-15e0d2bd1304db62fad286c1bb687e87c361e16c.tar.gz
spark-15e0d2bd1304db62fad286c1bb687e87c361e16c.tar.bz2
spark-15e0d2bd1304db62fad286c1bb687e87c361e16c.zip
[SPARK-6765] Fix test code style for streaming.
So we can turn style checker on for test code. Author: Reynold Xin <rxin@databricks.com> Closes #5409 from rxin/test-style-streaming and squashes the following commits: 7aea69b [Reynold Xin] [SPARK-6765] Fix test code style for streaming.
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala29
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala4
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala45
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala5
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala28
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala3
19 files changed, 115 insertions, 75 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index e04d4088df..2edea9b5b6 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -1,21 +1,20 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
@@ -213,7 +212,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
assert(counter === totalEventsPerChannel * channels.size)
}
- def assertChannelIsEmpty(channel: MemoryChannel) = {
+ def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
queueRemaining.setAccessible(true)
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 51d273af8d..39e6754c81 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -151,7 +151,9 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
}
/** Class to create socket channel with compression */
- private class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
+ private class CompressionChannelFactory(compressionLevel: Int)
+ extends NioClientSocketChannelFactory {
+
override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
val encoder = new ZlibEncoder(compressionLevel)
pipeline.addFirst("deflater", encoder)
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 24d78ecb3a..a19a72c58a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -139,7 +139,8 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
msgTopic.publish(message)
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
- Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
+ // wait for Spark streaming to consume something from the message queue
+ Thread.sleep(50)
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index cf191715d2..87bc20f79c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -171,7 +171,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("flatMapValues") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
- (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10)),
+ (s: DStream[String]) => {
+ s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10))
+ },
Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ),
true
)
@@ -474,7 +476,7 @@ class BasicOperationsSuite extends TestSuiteBase {
stream.foreachRDD(_ => {}) // Dummy output stream
ssc.start()
Thread.sleep(2000)
- def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
+ def getInputFromSlice(fromMillis: Long, toMillis: Long): Set[Int] = {
stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 91a2b2bba4..54c30440a6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -43,7 +43,7 @@ class CheckpointSuite extends TestSuiteBase {
var ssc: StreamingContext = null
- override def batchDuration = Milliseconds(500)
+ override def batchDuration: Duration = Milliseconds(500)
override def beforeFunction() {
super.beforeFunction()
@@ -72,7 +72,7 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
- Some((values.sum + state.getOrElse(0)))
+ Some(values.sum + state.getOrElse(0))
}
st.map(x => (x, 1))
.updateStateByKey(updateFunc)
@@ -199,7 +199,12 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
- Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ),
+ Seq(
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)),
+ Seq(),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)), Seq() ),
3
)
}
@@ -212,7 +217,8 @@ class CheckpointSuite extends TestSuiteBase {
val n = 10
val w = 4
val input = (1 to n).map(_ => Seq("a")).toSeq
- val output = Seq(Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
+ val output = Seq(
+ Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 3))) ++ (1 to (n - w + 1)).map(x => Seq(("a", 4)))
val operation = (st: DStream[String]) => {
st.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
@@ -236,7 +242,13 @@ class CheckpointSuite extends TestSuiteBase {
classOf[TextOutputFormat[Text, IntWritable]])
output
},
- Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ Seq(
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)),
+ Seq(),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)),
+ Seq()),
3
)
} finally {
@@ -259,7 +271,13 @@ class CheckpointSuite extends TestSuiteBase {
classOf[NewTextOutputFormat[Text, IntWritable]])
output
},
- Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ Seq(
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)),
+ Seq(),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)),
+ Seq()),
3
)
} finally {
@@ -298,7 +316,13 @@ class CheckpointSuite extends TestSuiteBase {
output
}
},
- Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ Seq(
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)),
+ Seq(),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("", 2)),
+ Seq()),
3
)
} finally {
@@ -533,7 +557,8 @@ class CheckpointSuite extends TestSuiteBase {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
- def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
+ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] =
+ {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
for (i <- 1 to numBatches.toInt) {
@@ -543,7 +568,7 @@ class CheckpointSuite extends TestSuiteBase {
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
Thread.sleep(batchDuration.milliseconds)
- val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
+ val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output.map(_.flatten)
@@ -552,4 +577,4 @@ class CheckpointSuite extends TestSuiteBase {
private object CheckpointSuite extends Serializable {
var batchThreeShouldBlockIndefinitely: Boolean = true
-} \ No newline at end of file
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 26435d8515..0c4c06534a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -29,9 +29,9 @@ class FailureSuite extends TestSuiteBase with Logging {
val directory = Utils.createTempDir()
val numBatches = 30
- override def batchDuration = Milliseconds(1000)
+ override def batchDuration: Duration = Milliseconds(1000)
- override def useManualClock = false
+ override def useManualClock: Boolean = false
override def afterFunction() {
Utils.deleteRecursively(directory)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7ed6320a3d..e6ac4975c5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -52,7 +52,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
- def output = outputBuffer.flatMap(x => x)
+ def output: ArrayBuffer[String] = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()
@@ -164,7 +164,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
- def output = outputBuffer.flatMap(x => x)
+ def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
outputStream.register()
ssc.start()
@@ -196,7 +196,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val queueStream = ssc.queueStream(queue, oneAtATime = true)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
- def output = outputBuffer.filter(_.size > 0)
+ def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()
@@ -204,7 +204,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
- //Thread.sleep(1000)
+
val inputIterator = input.toIterator
for (i <- 0 until input.size) {
// Enqueue more than 1 item per tick but they should dequeue one at a time
@@ -239,7 +239,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val queueStream = ssc.queueStream(queue, oneAtATime = false)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(queueStream, outputBuffer)
- def output = outputBuffer.filter(_.size > 0)
+ def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
outputStream.register()
ssc.start()
@@ -352,7 +352,8 @@ class TestServer(portToBind: Int = 0) extends Logging {
logInfo("New connection")
try {
clientSocket.setTcpNoDelay(true)
- val outputStream = new BufferedWriter(new OutputStreamWriter(clientSocket.getOutputStream))
+ val outputStream = new BufferedWriter(
+ new OutputStreamWriter(clientSocket.getOutputStream))
while(clientSocket.isConnected) {
val msg = queue.poll(100, TimeUnit.MILLISECONDS)
@@ -384,7 +385,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
def stop() { servingThread.interrupt() }
- def port = serverSocket.getLocalPort
+ def port: Int = serverSocket.getLocalPort
}
/** This is a receiver to test multiple threads inserting data using block generator */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index ef4873de2f..c090eaec29 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -96,7 +96,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
- blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+ blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
}.toList
storedData shouldEqual data
@@ -120,7 +120,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
testBlockStoring(handler) { case (data, blockIds, storeResults) =>
// Verify the data in block manager is correct
val storedData = blockIds.flatMap { blockId =>
- blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+ blockManager.getLocal(blockId).map(_.data.map(_.toString).toList).getOrElse(List.empty)
}.toList
storedData shouldEqual data
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 42fad769f0..b63b37d9f9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -228,7 +228,8 @@ class ReceivedBlockTrackerSuite
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
*/
- def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles): Seq[ReceivedBlockTrackerLogEvent] = {
+ def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
+ : Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
file => new WriteAheadLogReader(file, hadoopConf).toSeq
}.map { byteBuffer =>
@@ -244,7 +245,8 @@ class ReceivedBlockTrackerSuite
}
/** Create batch allocation object from the given info */
- def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo]): BatchAllocationEvent = {
+ def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo])
+ : BatchAllocationEvent = {
BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index aa20ad0b53..10c35cba8d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -308,7 +308,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
val errors = new ArrayBuffer[Throwable]
/** Check if all data structures are clean */
- def isAllEmpty = {
+ def isAllEmpty: Boolean = {
singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
arrayBuffers.isEmpty && errors.isEmpty
}
@@ -320,24 +320,21 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
- optionalBlockId: Option[StreamBlockId]
- ) {
+ optionalBlockId: Option[StreamBlockId]) {
byteBuffers += bytes
}
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
- optionalBlockId: Option[StreamBlockId]
- ) {
+ optionalBlockId: Option[StreamBlockId]) {
iterators += iterator
}
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
- optionalBlockId: Option[StreamBlockId]
- ) {
+ optionalBlockId: Option[StreamBlockId]) {
arrayBuffers += arrayBuffer
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 2e5005ef6f..d1bbf39dc7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -213,7 +213,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
SlowTestReceiver.receivedAllRecords = false
- //Create test receiver that sleeps in onStop()
+ // Create test receiver that sleeps in onStop()
val totalNumRecords = 15
val recordsPerSecond = 1
val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond))
@@ -370,7 +370,8 @@ object TestReceiver {
}
/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
-class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
+class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
+ extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
var receivingThreadOption: Option[Thread] = None
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index f52562b0a0..852e8bb71d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -38,8 +38,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
// To make sure that the processing start and end times in collected
// information are different for successive batches
- override def batchDuration = Milliseconds(100)
- override def actuallyWait = true
+ override def batchDuration: Duration = Milliseconds(100)
+ override def actuallyWait: Boolean = true
test("batch info reporting") {
val ssc = setupStreams(input, operation)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 3565d621e8..c3cae8aeb6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -53,8 +53,9 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
val selectedInput = if (index < input.size) input(index) else Seq[T]()
// lets us test cases where RDDs are not created
- if (selectedInput == null)
+ if (selectedInput == null) {
return None
+ }
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
logInfo("Created RDD " + rdd.id + " with " + selectedInput)
@@ -104,7 +105,9 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
output.clear()
}
- def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten))
+ def toTestOutputStream: TestOutputStream[T] = {
+ new TestOutputStream[T](this.parent, this.output.map(_.flatten))
+ }
}
/**
@@ -148,34 +151,34 @@ class BatchCounter(ssc: StreamingContext) {
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Name of the framework for Spark context
- def framework = this.getClass.getSimpleName
+ def framework: String = this.getClass.getSimpleName
// Master for Spark context
- def master = "local[2]"
+ def master: String = "local[2]"
// Batch duration
- def batchDuration = Seconds(1)
+ def batchDuration: Duration = Seconds(1)
// Directory where the checkpoint data will be saved
- lazy val checkpointDir = {
+ lazy val checkpointDir: String = {
val dir = Utils.createTempDir()
logDebug(s"checkpointDir: $dir")
dir.toString
}
// Number of partitions of the input parallel collections created for testing
- def numInputPartitions = 2
+ def numInputPartitions: Int = 2
// Maximum time to wait before the test times out
- def maxWaitTimeMillis = 10000
+ def maxWaitTimeMillis: Int = 10000
// Whether to use manual clock or not
- def useManualClock = true
+ def useManualClock: Boolean = true
// Whether to actually wait in real time before changing manual clock
- def actuallyWait = false
+ def actuallyWait: Boolean = false
- //// A SparkConf to use in tests. Can be modified before calling setupStreams to configure things.
+ // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things.
val conf = new SparkConf()
.setMaster(master)
.setAppName(framework)
@@ -346,7 +349,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Wait until expected number of output items have been generated
val startTime = System.currentTimeMillis()
- while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ while (output.size < numExpectedOutput &&
+ System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
ssc.awaitTerminationOrTimeout(50)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 87a0395efb..998426ebb8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -32,7 +32,8 @@ import org.apache.spark._
/**
* Selenium tests for the Spark Web UI.
*/
-class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll with TestSuiteBase {
+class UISeleniumSuite
+ extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll with TestSuiteBase {
implicit var webDriver: WebDriver = _
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index a5d2bb2fde..c39ad05f41 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -22,9 +22,9 @@ import org.apache.spark.storage.StorageLevel
class WindowOperationsSuite extends TestSuiteBase {
- override def maxWaitTimeMillis = 20000 // large window tests can sometimes take longer
+ override def maxWaitTimeMillis: Int = 20000 // large window tests can sometimes take longer
- override def batchDuration = Seconds(1) // making sure its visible in this class
+ override def batchDuration: Duration = Seconds(1) // making sure its visible in this class
val largerSlideInput = Seq(
Seq(("a", 1)),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 7a6a2f3e57..c3602a5b73 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -28,10 +28,13 @@ import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBloc
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
import org.apache.spark.util.Utils
-class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
+class WriteAheadLogBackedBlockRDDSuite
+ extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
+
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
+
val hadoopConf = new Configuration()
var sparkContext: SparkContext = null
@@ -86,7 +89,8 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll w
* @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
* @param testStoreInBM Test whether blocks read from log are stored back into block manager
*/
- private def testRDD(numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
+ private def testRDD(
+ numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
val numBlocks = numPartitionsInBM + numPartitionsInWAL
val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
@@ -110,7 +114,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll w
"Unexpected blocks in BlockManager"
)
- // Make sure that the right `numPartitionsInWAL` blocks are in write ahead logs, and other are not
+ // Make sure that the right `numPartitionsInWAL` blocks are in WALs, and other are not
require(
segments.takeRight(numPartitionsInWAL).forall(s =>
new File(s.path.stripPrefix("file://")).exists()),
@@ -152,6 +156,6 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll w
}
private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = {
- Array.fill(count)(new WriteAheadLogFileSegment("random", 0l, 0))
+ Array.fill(count)(new WriteAheadLogFileSegment("random", 0L, 0))
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
index 4150b60635..7865b06c2e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -90,7 +90,7 @@ class JobGeneratorSuite extends TestSuiteBase {
val receiverTracker = ssc.scheduler.receiverTracker
// Get the blocks belonging to a batch
- def getBlocksOfBatch(batchTime: Long) = {
+ def getBlocksOfBatch(batchTime: Long): Seq[ReceivedBlockInfo] = {
receiverTracker.getBlocksOfBatchAndStream(Time(batchTime), inputStream.id)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 8335659667..a3919c43b9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -291,7 +291,7 @@ object WriteAheadLogSuite {
manager
}
- /** Read data from a segments of a log file directly and return the list of byte buffers.*/
+ /** Read data from a segments of a log file directly and return the list of byte buffers. */
def readDataManually(segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
segments.map { segment =>
val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)
diff --git a/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
index d0bf328f2b..d667504630 100644
--- a/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
@@ -25,7 +25,8 @@ package org.apache.spark.streamingtest
*/
class ImplicitSuite {
- // We only want to test if `implict` works well with the compiler, so we don't need a real DStream.
+ // We only want to test if `implicit` works well with the compiler,
+ // so we don't need a real DStream.
def mockDStream[T]: org.apache.spark.streaming.dstream.DStream[T] = null
def testToPairDStreamFunctions(): Unit = {