aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
committerSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
commite97fc7f176f8bf501c9b3afd8410014e3b0e1602 (patch)
tree23a11a3646b13195aaf50078a0f35fad96190618 /streaming
parent02b7677e9584f5ccd68869abdb0bf980dc847ce1 (diff)
downloadspark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.gz
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.bz2
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.zip
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen <sowen@cloudera.com> Closes #11292 from srowen/SPARK-13423.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala8
15 files changed, 25 insertions, 25 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index eedb42c061..d397688245 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -160,7 +160,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
require(batchDuration != null, "Batch duration has not been set")
// assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
// " is very low")
- require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
+ require(getOutputStreams().nonEmpty, "No output operations registered, so nothing to execute")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index a25dada5ea..7fba2e8ec0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -276,7 +276,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
config)
case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
}
- if (rdd.partitions.size == 0) {
+ if (rdd.partitions.isEmpty) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 49d8f14f4c..fd3e72e41b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -108,7 +108,7 @@ abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
- if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
+ if (blockInfos.exists(_.walRecordHandleOption.nonEmpty)) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
@@ -119,7 +119,7 @@ abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
val validBlockIds = blockIds.filter { id =>
ssc.sparkContext.env.blockManager.master.contains(id)
}
- if (validBlockIds.size != blockIds.size) {
+ if (validBlockIds.length != blockIds.length) {
logWarning("Some blocks could not be recovered as they were not found in memory. " +
"To prevent such data loss, enable Write Ahead Log (see programming guide " +
"for more details.")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 5359549085..817ecc1ace 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -129,7 +129,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val numNewValues = newRDDs.size
val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
- if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
+ if (arrayOfValues.length != 1 + numOldValues + numNewValues) {
throw new Exception("Unexpected number of sequences of reduced values")
}
// Getting reduced values "old time steps" that will be removed from current window
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 2b07dd6185..c1846a31f6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -29,8 +29,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
require(parents.length > 0, "List of DStreams to union is empty")
- require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
- require(parents.map(_.slideDuration).distinct.size == 1,
+ require(parents.map(_.ssc).distinct.length == 1, "Some of the DStreams have different contexts")
+ require(parents.map(_.slideDuration).distinct.length == 1,
"Some of the DStreams have different slide durations")
override def dependencies: List[DStream[_]] = parents.toList
@@ -44,7 +44,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
case None => throw new SparkException("Could not generate RDD from a parent for unifying at" +
s" time $validTime")
}
- if (rdds.size > 0) {
+ if (rdds.nonEmpty) {
Some(new UnionRDD(ssc.sc, rdds))
} else {
None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index a3ad5eaa40..a83c0d922d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -218,12 +218,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
- logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
+ logInfo("Batches pending processing (" + pendingTimes.length + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }
.distinct.sorted(Time.ordering)
- logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
+ logInfo("Batches to reschedule (" + timesToReschedule.length + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach { time =>
// Allocate the related blocks when recovering from failure, because some blocks that were
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 678f1dc950..6e24365c4e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -131,7 +131,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Track the active receiver job number. When a receiver job exits ultimately, countDown will
// be called.
- private val receiverJobExitLatch = new CountDownLatch(receiverInputStreams.size)
+ private val receiverJobExitLatch = new CountDownLatch(receiverInputStreams.length)
/**
* Track all receivers' information. The key is the receiver id, the value is the receiver info.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index e235afad5e..c8a2c17bbd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -259,7 +259,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
} else {
var nextLineIndex = failure.indexOf("\n")
if (nextLineIndex < 0) {
- nextLineIndex = failure.size
+ nextLineIndex = failure.length
}
val firstLine = failure.substring(0, nextLineIndex)
s"Failed due to error: $firstLine\n$failure"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 30a3a98c01..430f35a400 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -161,7 +161,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def numInactiveReceivers: Int = {
- ssc.graph.getReceiverInputStreams().size - numActiveReceivers
+ ssc.graph.getReceiverInputStreams().length - numActiveReceivers
}
def numTotalCompletedBatches: Long = synchronized {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index c5d9f26cb2..2a5d0839fd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -444,7 +444,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
}.getOrElse(emptyCell)
val receiverLastError = receiverInfo.map { info =>
val msg = s"${info.lastErrorMessage} - ${info.lastError}"
- if (msg.size > 100) msg.take(97) + "..." else msg
+ if (msg.length > 100) msg.take(97) + "..." else msg
}.getOrElse(emptyCell)
val receiverLastErrorTime = receiverInfo.map {
r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
index a485a46937..9b1c939e93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala
@@ -136,7 +136,7 @@ private[streaming] object UIUtils {
} else {
var nextLineIndex = failure.indexOf("\n")
if (nextLineIndex < 0) {
- nextLineIndex = failure.size
+ nextLineIndex = failure.length
}
val firstLine = failure.substring(0, nextLineIndex)
s"Failed due to error: $firstLine\n$failure"
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index f1c64799c6..bd60059b18 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -76,7 +76,7 @@ class BasicOperationsSuite extends TestSuiteBase {
assert(numInputPartitions === 2, "Number of input partitions has been changed from 2")
val input = Seq(1 to 4, 5 to 8, 9 to 12)
val output = Seq(Seq(3, 7), Seq(11, 15), Seq(19, 23))
- val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduce(_ + _)))
+ val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.sum))
testOperation(input, operation, output, true)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 6c60652cd6..19c89fcf67 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -278,7 +278,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(data: Iterable[Long]): Boolean = {
- !data.sliding(2).map{itr => itr.size == 2 && itr.head > itr.tail.head }.contains(true)
+ !data.sliding(2).exists { itr => itr.size == 2 && itr.head > itr.tail.head }
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 96dd4757be..3f12de38ef 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -161,12 +161,12 @@ class UISeleniumSuite
jobLinks.size should be (4)
// Check stage progress
- findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toSeq should be
- (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+ findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toList should be (
+ List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
// Check job progress
- findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be
- (List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
+ findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
+ List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
// Check stacktrace
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 734dd93cda..7460e8629b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -74,12 +74,12 @@ abstract class CommonWriteAheadLogTests(
test(testPrefix + "read all logs") {
// Write data manually for testing reading through WriteAheadLog
- val writtenData = (1 to 10).map { i =>
+ val writtenData = (1 to 10).flatMap { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
writeDataManually(data, file, allowBatching)
data
- }.flatten
+ }
val logDirectoryPath = new Path(testDir)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
@@ -193,12 +193,12 @@ abstract class CommonWriteAheadLogTests(
test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") {
// write some data
- val writtenData = (1 to 10).map { i =>
+ val writtenData = (1 to 10).flatMap { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
writeDataManually(data, file, allowBatching)
data
- }.flatten
+ }
val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
// create iterator but don't materialize it