diff options
author | Sean Owen <sowen@cloudera.com> | 2016-03-03 09:54:09 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-03 09:54:09 +0000 |
commit | e97fc7f176f8bf501c9b3afd8410014e3b0e1602 (patch) | |
tree | 23a11a3646b13195aaf50078a0f35fad96190618 /streaming/src/test | |
parent | 02b7677e9584f5ccd68869abdb0bf980dc847ce1 (diff) | |
download | spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.gz spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.bz2 spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.zip |
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
## What changes were proposed in this pull request?
Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly:
- Inner class should be static
- Mismatched hashCode/equals
- Overflow in compareTo
- Unchecked warnings
- Misuse of assert, vs junit.assert
- get(a) + getOrElse(b) -> getOrElse(a,b)
- Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions
- Dead code
- tailrec
- exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count
- reduce(_+_) -> sum map + flatten -> map
The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places.
## How was the this patch tested?
Existing Jenkins unit tests.
Author: Sean Owen <sowen@cloudera.com>
Closes #11292 from srowen/SPARK-13423.
Diffstat (limited to 'streaming/src/test')
4 files changed, 10 insertions, 10 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index f1c64799c6..bd60059b18 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -76,7 +76,7 @@ class BasicOperationsSuite extends TestSuiteBase { assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") val input = Seq(1 to 4, 5 to 8, 9 to 12) val output = Seq(Seq(3, 7), Seq(11, 15), Seq(19, 23)) - val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduce(_ + _))) + val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.sum)) testOperation(input, operation, output, true) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 6c60652cd6..19c89fcf67 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -278,7 +278,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { /** Check if a sequence of numbers is in increasing order */ def isInIncreasingOrder(data: Iterable[Long]): Boolean = { - !data.sliding(2).map{itr => itr.size == 2 && itr.head > itr.tail.head }.contains(true) + !data.sliding(2).exists { itr => itr.size == 2 && itr.head > itr.tail.head } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index 96dd4757be..3f12de38ef 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -161,12 +161,12 @@ class UISeleniumSuite jobLinks.size should be (4) // Check stage progress - findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toSeq should be - (List("1/1", "1/1", "1/1", "0/1 (1 failed)")) + findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toList should be ( + List("1/1", "1/1", "1/1", "0/1 (1 failed)")) // Check job progress - findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be - (List("1/1", "1/1", "1/1", "0/1 (1 failed)")) + findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be ( + List("4/4", "4/4", "4/4", "0/4 (1 failed)")) // Check stacktrace val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 734dd93cda..7460e8629b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -74,12 +74,12 @@ abstract class CommonWriteAheadLogTests( test(testPrefix + "read all logs") { // Write data manually for testing reading through WriteAheadLog - val writtenData = (1 to 10).map { i => + val writtenData = (1 to 10).flatMap { i => val data = generateRandomData() val file = testDir + s"/log-$i-$i" writeDataManually(data, file, allowBatching) data - }.flatten + } val logDirectoryPath = new Path(testDir) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) @@ -193,12 +193,12 @@ abstract class CommonWriteAheadLogTests( test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") { // write some data - val writtenData = (1 to 10).map { i => + val writtenData = (1 to 10).flatMap { i => val data = generateRandomData() val file = testDir + s"/log-$i-$i" writeDataManually(data, file, allowBatching) data - }.flatten + } val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching) // create iterator but don't materialize it |