aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
committerSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
commite97fc7f176f8bf501c9b3afd8410014e3b0e1602 (patch)
tree23a11a3646b13195aaf50078a0f35fad96190618 /streaming/src/test
parent02b7677e9584f5ccd68869abdb0bf980dc847ce1 (diff)
downloadspark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.gz
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.bz2
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.zip
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen <sowen@cloudera.com> Closes #11292 from srowen/SPARK-13423.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala8
4 files changed, 10 insertions, 10 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index f1c64799c6..bd60059b18 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -76,7 +76,7 @@ class BasicOperationsSuite extends TestSuiteBase {
assert(numInputPartitions === 2, "Number of input partitions has been changed from 2")
val input = Seq(1 to 4, 5 to 8, 9 to 12)
val output = Seq(Seq(3, 7), Seq(11, 15), Seq(19, 23))
- val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduce(_ + _)))
+ val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.sum))
testOperation(input, operation, output, true)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 6c60652cd6..19c89fcf67 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -278,7 +278,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(data: Iterable[Long]): Boolean = {
- !data.sliding(2).map{itr => itr.size == 2 && itr.head > itr.tail.head }.contains(true)
+ !data.sliding(2).exists { itr => itr.size == 2 && itr.head > itr.tail.head }
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 96dd4757be..3f12de38ef 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -161,12 +161,12 @@ class UISeleniumSuite
jobLinks.size should be (4)
// Check stage progress
- findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toSeq should be
- (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+ findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toList should be (
+ List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
// Check job progress
- findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be
- (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+ findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
+ List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
// Check stacktrace
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 734dd93cda..7460e8629b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -74,12 +74,12 @@ abstract class CommonWriteAheadLogTests(
test(testPrefix + "read all logs") {
// Write data manually for testing reading through WriteAheadLog
- val writtenData = (1 to 10).map { i =>
+ val writtenData = (1 to 10).flatMap { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
writeDataManually(data, file, allowBatching)
data
- }.flatten
+ }
val logDirectoryPath = new Path(testDir)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
@@ -193,12 +193,12 @@ abstract class CommonWriteAheadLogTests(
test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") {
// write some data
- val writtenData = (1 to 10).map { i =>
+ val writtenData = (1 to 10).flatMap { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
writeDataManually(data, file, allowBatching)
data
- }.flatten
+ }
val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
// create iterator but don't materialize it