aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/org/apache
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-28 17:55:22 -0700
committerReynold Xin <rxin@databricks.com>2015-05-28 17:55:22 -0700
commit3af0b3136e4b7dea52c413d640653ccddc638574 (patch)
tree4963a880985f4833f306f34d702ffbc0f1eeec3b /streaming/src/test/scala/org/apache
parent1bd63e82fdb6ee57c61051430d63685b801df016 (diff)
downloadspark-3af0b3136e4b7dea52c413d640653ccddc638574.tar.gz
spark-3af0b3136e4b7dea52c413d640653ccddc638574.tar.bz2
spark-3af0b3136e4b7dea52c413d640653ccddc638574.zip
[SPARK-7927] whitespace fixes for streaming.
So we can enable a whitespace enforcement rule in the style checker to save code review time. Author: Reynold Xin <rxin@databricks.com> Closes #6475 from rxin/whitespace-streaming and squashes the following commits: 810dae4 [Reynold Xin] Fixed tests. 89068ad [Reynold Xin] [SPARK-7927] whitespace fixes for streaming.
Diffstat (limited to 'streaming/src/test/scala/org/apache')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala2
5 files changed, 12 insertions, 8 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index f269cb74e0..08faeaa58f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -255,7 +255,7 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
- s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
+ s1.map(x => (x, 1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}
@@ -427,9 +427,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("updateStateByKey - object lifecycle") {
val inputData =
Seq(
- Seq("a","b"),
+ Seq("a", "b"),
null,
- Seq("a","c","a"),
+ Seq("a", "c", "a"),
Seq("c"),
null,
null
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 0122514f93..b74d67c63a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -418,7 +418,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
val servingThread = new Thread() {
override def run() {
try {
- while(true) {
+ while (true) {
logInfo("Accepting connections on port " + port)
val clientSocket = serverSocket.accept()
if (startLatch.getCount == 1) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f8e8030791..e36c7914b1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -732,7 +732,9 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
def onStop() {
// Simulate slow receiver by waiting for all records to be produced
- while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
+ while (!SlowTestReceiver.receivedAllRecords) {
+ Thread.sleep(100)
+ }
// no clean to be done, the receiving thread should stop on it own
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 312cce408c..1dc8960d60 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -133,8 +133,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
- for(i <- 1 until seq.size) {
- if (seq(i - 1) > seq(i)) return false
+ for (i <- 1 until seq.size) {
+ if (seq(i - 1) > seq(i)) {
+ return false
+ }
}
true
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 2a0f45830e..c9175d61b1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -64,7 +64,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalReceivedRecords should be (0)
// onBatchStarted
- val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))