aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
committerSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
commite97fc7f176f8bf501c9b3afd8410014e3b0e1602 (patch)
tree23a11a3646b13195aaf50078a0f35fad96190618 /core
parent02b7677e9584f5ccd68869abdb0bf980dc847ce1 (diff)
downloadspark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.gz
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.bz2
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.zip
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen <sowen@cloudera.com> Closes #11292 from srowen/SPARK-13423.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java4
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java2
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala2
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java36
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java6
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java22
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java3
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala4
47 files changed, 116 insertions, 98 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index d74602cd20..2381cff61f 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -30,7 +30,9 @@ final class ShuffleInMemorySorter {
private static final class SortComparator implements Comparator<PackedRecordPointer> {
@Override
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
- return left.getPartitionId() - right.getPartitionId();
+ int leftId = left.getPartitionId();
+ int rightId = right.getPartitionId();
+ return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);
}
}
private static final SortComparator SORT_COMPARATOR = new SortComparator();
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 296bf722fc..9236bd2c04 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -550,7 +550,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
/**
* Chain multiple UnsafeSorterIterator together as single one.
*/
- class ChainedIterator extends UnsafeSorterIterator {
+ static class ChainedIterator extends UnsafeSorterIterator {
private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index 9aafc9eb1c..b65cfdc4df 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -88,7 +88,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
- shuffleId, _rdd.partitions.size, this)
+ shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index a7c2790c83..976c19f2b0 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -55,14 +55,14 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
- val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
+ val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
- new HashPartitioner(bySize.head.partitions.size)
+ new HashPartitioner(bySize.head.partitions.length)
}
}
}
@@ -122,7 +122,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
- val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
+ val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
@@ -137,7 +137,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
- val weight = (n.toDouble / sample.size).toFloat
+ val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0e8b735b92..b503c6184a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -23,8 +23,8 @@ import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
-import java.util.UUID.randomUUID
+import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.generic.Growable
@@ -391,8 +391,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
- _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
- _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
+ _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+ _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_eventLogDir =
@@ -2310,6 +2310,7 @@ object SparkContext extends Logging {
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
+ @tailrec
private def createTaskScheduler(
sc: SparkContext,
master: String,
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index c8f201ea9e..509fb2eb0e 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -248,7 +248,6 @@ case class ExecutorLostFailure(
} else {
"unrelated to the running tasks"
}
- s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})"
s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" +
reason.map { r => s" Reason: $r" }.getOrElse("")
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 255420182b..a86ee66fb7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
import java.net.{URI, URISyntaxException}
+import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer
import org.apache.log4j.Level
@@ -49,6 +50,7 @@ private[deploy] class ClientArguments(args: Array[String]) {
parse(args.toList)
+ @tailrec
private def parse(args: List[String]): Unit = args match {
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index d5a3383932..7d7ddccdcf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -22,6 +22,7 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
import java.net.URL
import java.security.PrivilegedExceptionAction
+import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import org.apache.commons.lang3.StringUtils
@@ -150,6 +151,7 @@ object SparkSubmit {
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
+ @tailrec
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
@@ -721,6 +723,7 @@ object SparkSubmit {
throw new IllegalStateException("The main method in the given main class must be static")
}
+ @tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index d03bab3820..fc3790f8d7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -17,6 +17,8 @@
package org.apache.spark.deploy.history
+import scala.annotation.tailrec
+
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils
@@ -29,6 +31,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
parse(args.toList)
+ @tailrec
private def parse(args: List[String]): Unit = {
if (args.length == 1) {
setLogDirectory(args.head)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 44cefbc77f..9cd7458ba0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -17,6 +17,8 @@
package org.apache.spark.deploy.master
+import scala.annotation.tailrec
+
import org.apache.spark.SparkConf
import org.apache.spark.util.{IntParam, Utils}
@@ -49,6 +51,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
webUiPort = conf.get("spark.master.ui.port").toInt
}
+ @tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index 39b2647a90..fb07c39dd0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -32,7 +32,7 @@ private[spark] class MasterSource(val master: Master) extends Source {
// Gauge for alive worker numbers in cluster
metricRegistry.register(MetricRegistry.name("aliveWorkers"), new Gauge[Int]{
- override def getValue: Int = master.workers.filter(_.state == WorkerState.ALIVE).size
+ override def getValue: Int = master.workers.count(_.state == WorkerState.ALIVE)
})
// Gauge for application numbers in cluster
@@ -42,6 +42,6 @@ private[spark] class MasterSource(val master: Master) extends Source {
// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
- override def getValue: Int = master.apps.filter(_.state == ApplicationState.WAITING).size
+ override def getValue: Int = master.apps.count(_.state == ApplicationState.WAITING)
})
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 540e802420..b0cedef72e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -50,7 +50,7 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
override def read[T: ClassTag](prefix: String): Seq[T] = {
zk.getChildren.forPath(WORKING_DIR).asScala
- .filter(_.startsWith(prefix)).map(deserializeFromFile[T]).flatten
+ .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
}
override def close() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index f9b0279c3d..363f4b84f8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -107,18 +107,18 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</li>
}.getOrElse { Seq.empty }
}
- <li><strong>Alive Workers:</strong> {aliveWorkers.size}</li>
+ <li><strong>Alive Workers:</strong> {aliveWorkers.length}</li>
<li><strong>Cores in use:</strong> {aliveWorkers.map(_.cores).sum} Total,
{aliveWorkers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory in use:</strong>
{Utils.megabytesToString(aliveWorkers.map(_.memory).sum)} Total,
{Utils.megabytesToString(aliveWorkers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Applications:</strong>
- {state.activeApps.size} Running,
- {state.completedApps.size} Completed </li>
+ {state.activeApps.length} Running,
+ {state.completedApps.length} Completed </li>
<li><strong>Drivers:</strong>
- {state.activeDrivers.size} Running,
- {state.completedDrivers.size} Completed </li>
+ {state.activeDrivers.length} Running,
+ {state.completedDrivers.length} Completed </li>
<li><strong>Status:</strong> {state.status}</li>
</ul>
</div>
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
index 5accaf78d0..38935e3209 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala
@@ -17,6 +17,8 @@
package org.apache.spark.deploy.mesos
+import scala.annotation.tailrec
+
import org.apache.spark.SparkConf
import org.apache.spark.util.{IntParam, Utils}
@@ -34,6 +36,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
+ @tailrec
private def parse(args: List[String]): Unit = args match {
case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 4ec6bfe2f9..006e2e1472 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -382,7 +382,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
logWarning(s"Unable to connect to server ${masterUrl}.")
lostMasters += masterUrl
}
- lostMasters.size >= masters.size
+ lostMasters.size >= masters.length
}
}
@@ -412,13 +412,13 @@ private[spark] object RestSubmissionClient {
}
def main(args: Array[String]): Unit = {
- if (args.size < 2) {
+ if (args.length < 2) {
sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]")
sys.exit(1)
}
val appResource = args(0)
val mainClass = args(1)
- val appArgs = args.slice(2, args.size)
+ val appArgs = args.slice(2, args.length)
val conf = new SparkConf
val env = filterSystemEnvironment(sys.env)
run(appResource, mainClass, appArgs, conf, env)
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index a8b2f78889..3b96488a12 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -94,7 +94,7 @@ private[mesos] class MesosSubmitRequestServlet(
val driverCores = sparkProperties.get("spark.driver.cores")
val appArgs = request.appArgs
val environmentVariables = request.environmentVariables
- val name = request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
+ val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
// Construct driver description
val conf = new SparkConf(false).setAll(sparkProperties)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index df3c286a0a..1c24c631ee 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -148,7 +148,7 @@ private[deploy] class Worker(
// time so that we can register with all masters.
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"worker-register-master-threadpool",
- masterRpcAddresses.size // Make sure we can register with all masters at the same time
+ masterRpcAddresses.length // Make sure we can register with all masters at the same time
)
var coresUsed = 0
@@ -445,13 +445,12 @@ private[deploy] class Worker(
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
- val appLocalDirs = appDirectories.get(appId).getOrElse {
+ val appLocalDirs = appDirectories.getOrElse(appId,
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
- }.toSeq
- }
+ }.toSeq)
appDirectories(appId) = appLocalDirs
val manager = new ExecutorRunner(
appId,
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index de3c7cd265..391eb41190 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker
import java.lang.management.ManagementFactory
+import scala.annotation.tailrec
+
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
import org.apache.spark.SparkConf
@@ -63,6 +65,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
checkWorkerMemory()
+ @tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
index d25452daf7..b089bbd7e9 100644
--- a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
@@ -38,7 +38,7 @@ private[spark] class ApproximateActionListener[T, U, R](
extends JobListener {
val startTime = System.currentTimeMillis()
- val totalTasks = rdd.partitions.size
+ val totalTasks = rdd.partitions.length
var finishedTasks = 0
var failure: Option[Exception] = None // Set if the job has failed (permanently)
var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index bfe19195fc..a163bbd264 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -41,7 +41,7 @@ private[spark] class LocalCheckpointRDD[T: ClassTag](
extends CheckpointRDD[T](sc) {
def this(rdd: RDD[T]) {
- this(rdd.context, rdd.id, rdd.partitions.size)
+ this(rdd.context, rdd.id, rdd.partitions.length)
}
protected override def getPartitions: Array[Partition] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ba773e1e7b..e2eaef5ec4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
+import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{HashMap, HashSet, Stack}
import scala.concurrent.Await
@@ -469,6 +470,7 @@ class DAGScheduler(
* all of that stage's ancestors.
*/
private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
+ @tailrec
def updateJobIdStageIdMapsList(stages: List[Stage]) {
if (stages.nonEmpty) {
val s = stages.head
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 29341dfe30..8b2f4973ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -620,7 +620,7 @@ private[spark] object TaskSchedulerImpl {
while (found) {
found = false
for (key <- keyList) {
- val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
+ val containerList: ArrayBuffer[T] = map.getOrElse(key, null)
assert(containerList != null)
// Get the index'th entry for this host - if present
if (index < containerList.size){
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2b0eab7169..f1339d530a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -202,7 +202,7 @@ private[spark] class TaskSetManager(
", but there are no executors alive there.")
}
}
- case _ => Unit
+ case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
@@ -828,7 +828,7 @@ private[spark] class TaskSetManager(
val time = clock.getTimeMillis()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
- val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
+ val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1))
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index ea718a0edb..8b72da2ee0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -68,7 +68,7 @@ private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoa
// scalastyle:on classforname
} catch {
case e: ClassNotFoundException =>
- JavaDeserializationStream.primitiveMappings.get(desc.getName).getOrElse(throw e)
+ JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e)
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
index 7750a09623..5c03609e5e 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -61,7 +61,7 @@ private[spark] object AllRDDResource {
.flatMap { _.rddBlocksById(rddId) }
.sortWith { _._1.name < _._1.name }
.map { case (blockId, status) =>
- (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
+ (blockId, status, blockLocations.getOrElse(blockId, Seq[String]("Unknown")))
}
val dataDistribution = if (includeDetails) {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
index a0f6360bc5..653150385c 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
@@ -30,7 +30,7 @@ private[v1] class OneJobResource(ui: SparkUI) {
def oneJob(@PathParam("jobId") jobId: Int): JobData = {
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
AllJobsResource.getStatusToJobs(ui)
- val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
+ val jobOpt = statusToJobs.flatMap(_._2).find { jobInfo => jobInfo.jobId == jobId}
jobOpt.map { job =>
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
}.getOrElse {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 8e2cfb2441..43cd15921c 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -82,9 +82,7 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, blocks) => blocks }
/** Return the blocks that belong to the given RDD stored in this block manager. */
- def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = {
- _rddBlocks.get(rddId).getOrElse(Map.empty)
- }
+ def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = _rddBlocks.getOrElse(rddId, Map.empty)
/** Add the given block to this storage status. If it already exists, overwrite it. */
private[spark] def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
@@ -143,7 +141,7 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def getBlock(blockId: BlockId): Option[BlockStatus] = {
blockId match {
case RDDBlockId(rddId, _) =>
- _rddBlocks.get(rddId).map(_.get(blockId)).flatten
+ _rddBlocks.get(rddId).flatMap(_.get(blockId))
case _ =>
_nonRddBlocks.get(blockId)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
index 77c0bc8b53..f157a451ef 100644
--- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -63,7 +63,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
return
}
val stageIds = sc.statusTracker.getActiveStageIds()
- val stages = stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks() > 1)
+ val stages = stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1)
.filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 77ca60b000..2fd630a85c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -29,7 +29,7 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
val operationGraphListener = parent.operationGraphListener
def isFairScheduler: Boolean =
- jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+ jobProgresslistener.schedulingMode.contains(SchedulingMode.FAIR)
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 5989f0035b..ece5d0fce8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -34,7 +34,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
attachPage(new StagePage(this))
attachPage(new PoolPage(this))
- def isFairScheduler: Boolean = progressListener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+ def isFairScheduler: Boolean = progressListener.schedulingMode.contains(SchedulingMode.FAIR)
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index 89119cd357..bcae56e2f1 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -52,9 +52,8 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
* An empty list is returned if one or more of its stages has been cleaned up.
*/
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
- val skippedStageIds = jobIdToSkippedStageIds.get(jobId).getOrElse(Seq.empty)
- val graphs = jobIdToStageIds.get(jobId)
- .getOrElse(Seq.empty)
+ val skippedStageIds = jobIdToSkippedStageIds.getOrElse(jobId, Seq.empty)
+ val graphs = jobIdToStageIds.getOrElse(jobId, Seq.empty)
.flatMap { sid => stageIdToGraph.get(sid) }
// Mark any skipped stages as such
graphs.foreach { g =>
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index c9bb49b83e..76d7c6d414 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -156,7 +156,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
streamBlockTableSubrow(block._1, replications.head, replications.size, true)
} else {
streamBlockTableSubrow(block._1, replications.head, replications.size, true) ++
- replications.tail.map(streamBlockTableSubrow(block._1, _, replications.size, false)).flatten
+ replications.tail.flatMap(streamBlockTableSubrow(block._1, _, replications.size, false))
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index cfe247c668..9688cca4f0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -27,6 +27,7 @@ import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
+import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
@@ -2219,6 +2220,7 @@ private[spark] object Utils extends Logging {
/**
* Return whether the specified file is a parent directory of the child file.
*/
+ @tailrec
def isInDirectory(parent: File, child: File): Boolean = {
if (child == null || parent == null) {
return false
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 050ece12f1..a0eb05c7c0 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -117,7 +117,7 @@ private[spark] class RollingFileAppender(
}
}).sorted
val filesToBeDeleted = rolledoverFiles.take(
- math.max(0, rolledoverFiles.size - maxRetainedFiles))
+ math.max(0, rolledoverFiles.length - maxRetainedFiles))
filesToBeDeleted.foreach { file =>
logInfo(s"Deleting file executor log file ${file.getAbsolutePath}")
file.delete()
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index 776a2997cf..127789b632 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -73,37 +73,37 @@ public class TaskMemoryManagerSuite {
TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
c1.use(100);
- assert(c1.getUsed() == 100);
+ Assert.assertEquals(100, c1.getUsed());
c2.use(100);
- assert(c2.getUsed() == 100);
- assert(c1.getUsed() == 0); // spilled
+ Assert.assertEquals(100, c2.getUsed());
+ Assert.assertEquals(0, c1.getUsed()); // spilled
c1.use(100);
- assert(c1.getUsed() == 100);
- assert(c2.getUsed() == 0); // spilled
+ Assert.assertEquals(100, c1.getUsed());
+ Assert.assertEquals(0, c2.getUsed()); // spilled
c1.use(50);
- assert(c1.getUsed() == 50); // spilled
- assert(c2.getUsed() == 0);
+ Assert.assertEquals(50, c1.getUsed()); // spilled
+ Assert.assertEquals(0, c2.getUsed());
c2.use(50);
- assert(c1.getUsed() == 50);
- assert(c2.getUsed() == 50);
+ Assert.assertEquals(50, c1.getUsed());
+ Assert.assertEquals(50, c2.getUsed());
c1.use(100);
- assert(c1.getUsed() == 100);
- assert(c2.getUsed() == 0); // spilled
+ Assert.assertEquals(100, c1.getUsed());
+ Assert.assertEquals(0, c2.getUsed()); // spilled
c1.free(20);
- assert(c1.getUsed() == 80);
+ Assert.assertEquals(80, c1.getUsed());
c2.use(10);
- assert(c1.getUsed() == 80);
- assert(c2.getUsed() == 10);
+ Assert.assertEquals(80, c1.getUsed());
+ Assert.assertEquals(10, c2.getUsed());
c2.use(100);
- assert(c2.getUsed() == 100);
- assert(c1.getUsed() == 0); // spilled
+ Assert.assertEquals(100, c2.getUsed());
+ Assert.assertEquals(0, c1.getUsed()); // spilled
c1.free(0);
c2.free(100);
- assert(manager.cleanUpAllAllocatedMemory() == 0);
+ Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
}
@Test
@@ -114,7 +114,7 @@ public class TaskMemoryManagerSuite {
.set("spark.unsafe.offHeap", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
- assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP);
+ Assert.assertSame(MemoryMode.OFF_HEAP, manager.tungstenMemoryMode);
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index eb1da8e1b4..b4fa33f32a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -48,7 +48,7 @@ public class ShuffleInMemorySorterSuite {
public void testSortingEmptyInput() {
final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 100);
final ShuffleInMemorySorter.ShuffleSorterIterator iter = sorter.getSortedIterator();
- assert(!iter.hasNext());
+ Assert.assertFalse(iter.hasNext());
}
@Test
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 876c3a2283..add9d937d3 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -139,7 +139,7 @@ public class UnsafeShuffleWriterSuite {
new Answer<InputStream>() {
@Override
public InputStream answer(InvocationOnMock invocation) throws Throwable {
- assert (invocation.getArguments()[0] instanceof TempShuffleBlockId);
+ assertTrue(invocation.getArguments()[0] instanceof TempShuffleBlockId);
InputStream is = (InputStream) invocation.getArguments()[1];
if (conf.getBoolean("spark.shuffle.compress", true)) {
return CompressionCodec$.MODULE$.createCodec(conf).compressedInputStream(is);
@@ -154,7 +154,7 @@ public class UnsafeShuffleWriterSuite {
new Answer<OutputStream>() {
@Override
public OutputStream answer(InvocationOnMock invocation) throws Throwable {
- assert (invocation.getArguments()[0] instanceof TempShuffleBlockId);
+ assertTrue(invocation.getArguments()[0] instanceof TempShuffleBlockId);
OutputStream os = (OutputStream) invocation.getArguments()[1];
if (conf.getBoolean("spark.shuffle.compress", true)) {
return CompressionCodec$.MODULE$.createCodec(conf).compressedOutputStream(os);
@@ -252,7 +252,7 @@ public class UnsafeShuffleWriterSuite {
createWriter(false).stop(false);
}
- class PandaException extends RuntimeException {
+ static class PandaException extends RuntimeException {
}
@Test(expected=PandaException.class)
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 32f5a1a7e6..492fe49ba4 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -323,23 +323,23 @@ public class UnsafeExternalSorterSuite {
record[0] = (long) i;
sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0);
}
- assert(sorter.getNumberOfAllocatedPages() >= 2);
+ assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
UnsafeExternalSorter.SpillableIterator iter =
(UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
int lastv = 0;
for (int i = 0; i < n / 3; i++) {
iter.hasNext();
iter.loadNext();
- assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ assertTrue(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
lastv = i;
}
- assert(iter.spill() > 0);
- assert(iter.spill() == 0);
- assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == lastv);
+ assertTrue(iter.spill() > 0);
+ assertEquals(0, iter.spill());
+ assertTrue(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == lastv);
for (int i = n / 3; i < n; i++) {
iter.hasNext();
iter.loadNext();
- assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ assertEquals(i, Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()));
}
sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
@@ -355,15 +355,15 @@ public class UnsafeExternalSorterSuite {
record[0] = (long) i;
sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0);
}
- assert(sorter.getNumberOfAllocatedPages() >= 2);
+ assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
UnsafeExternalSorter.SpillableIterator iter =
(UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
- assert(iter.spill() > 0);
- assert(iter.spill() == 0);
+ assertTrue(iter.spill() > 0);
+ assertEquals(0, iter.spill());
for (int i = 0; i < n; i++) {
iter.hasNext();
iter.loadNext();
- assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ assertEquals(i, Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()));
}
sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
@@ -394,7 +394,7 @@ public class UnsafeExternalSorterSuite {
for (int i = 0; i < n; i++) {
iter.hasNext();
iter.loadNext();
- assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ assertEquals(i, Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()));
}
sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 8e557ec0ab..ff41768df1 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.util.Arrays;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.spark.HashPartitioner;
@@ -54,7 +55,7 @@ public class UnsafeInMemorySorterSuite {
mock(PrefixComparator.class),
100);
final UnsafeSorterIterator iter = sorter.getSortedIterator();
- assert(!iter.hasNext());
+ Assert.assertFalse(iter.hasNext());
}
@Test
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 8acd0439b6..4ff8ae57ab 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -347,7 +347,7 @@ private class SaveInfoListener extends SparkListener {
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.values.flatten.toSeq
def getCompletedTaskInfos(stageId: StageId, stageAttemptId: StageAttemptId): Seq[TaskInfo] =
- completedTaskInfos.get((stageId, stageAttemptId)).getOrElse(Seq.empty[TaskInfo])
+ completedTaskInfos.getOrElse((stageId, stageAttemptId), Seq.empty[TaskInfo])
/**
* If `jobCompletionCallback` is set, block until the next call has finished.
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 2fe99e3f81..79881f30b2 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -237,7 +237,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
conf.set(newName, "4")
assert(conf.get(newName) === "4")
- val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
+ val count = conf.getAll.count { case (k, v) => k.startsWith("spark.history.") }
assert(count === 4)
conf.set("spark.yarn.applicationMaster.waitTries", "42")
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index e7cc1617cd..31ce9483cf 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -101,7 +101,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val data = 1 until 100
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_ + _) === 99)
+ assert(slices.map(_.size).sum === 99)
assert(slices.forall(_.isInstanceOf[Range]))
}
@@ -109,7 +109,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val data = 1 to 100
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_ + _) === 100)
+ assert(slices.map(_.size).sum === 100)
assert(slices.forall(_.isInstanceOf[Range]))
}
@@ -202,7 +202,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val data = 1L until 100L
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_ + _) === 99)
+ assert(slices.map(_.size).sum === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
@@ -210,7 +210,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val data = 1L to 100L
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_ + _) === 100)
+ assert(slices.map(_.size).sum === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
@@ -218,7 +218,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val data = 1.0 until 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_ + _) === 99)
+ assert(slices.map(_.size).sum === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
@@ -226,7 +226,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
val data = 1.0 to 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_ + _) === 100)
+ assert(slices.map(_.size).sum === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 80347b800a..24daedab20 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -54,16 +54,16 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(!nums.isEmpty())
assert(nums.max() === 4)
assert(nums.min() === 1)
- val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
+ val partitionSums = nums.mapPartitions(iter => Iterator(iter.sum))
assert(partitionSums.collect().toList === List(3, 7))
val partitionSumsWithSplit = nums.mapPartitionsWithIndex {
- case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
+ case(split, iter) => Iterator((split, iter.sum))
}
assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7)))
val partitionSumsWithIndex = nums.mapPartitionsWithIndex {
- case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
+ case(split, iter) => Iterator((split, iter.sum))
}
assert(partitionSumsWithIndex.collect().toList === List((0, 3), (1, 7)))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 56e0f01b3b..759d52fca5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -79,7 +79,7 @@ class MapStatusSuite extends SparkFunSuite {
test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") {
val sizes = Array.tabulate[Long](3000) { i => i.toLong }
- val avg = sizes.sum / sizes.filter(_ != 0).length
+ val avg = sizes.sum / sizes.count(_ != 0)
val loc = BlockManagerId("a", "b", 10)
val status = MapStatus(loc, sizes)
val status1 = compressAndDecompressMapStatus(status)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index b5385c11a9..935e280e60 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -243,7 +243,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
val resAfter = captor.getValue
val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
- assert(resSizeBefore.exists(_ == 0L))
+ assert(resSizeBefore.contains(0L))
assert(resSizeAfter.exists(_.toString.toLong > 0L))
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index fdacd8c9f5..cf9f9da1e6 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -166,7 +166,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
writer.stop( /* success = */ true)
assert(temporaryFilesCreated.nonEmpty)
assert(writer.getPartitionLengths.sum === outputFile.length())
- assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files
+ assert(writer.getPartitionLengths.count(_ == 0L) === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
assert(shuffleWriteMetrics.bytesWritten === outputFile.length())
diff --git a/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
index ddd5edf4f7..0c8b8cfdd5 100644
--- a/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
+++ b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
@@ -23,9 +23,7 @@ import org.apache.spark.SparkConf
* Customized SparkConf that allows env variables to be overridden.
*/
class SparkConfWithEnv(env: Map[String, String]) extends SparkConf(false) {
- override def getenv(name: String): String = {
- env.get(name).getOrElse(super.getenv(name))
- }
+ override def getenv(name: String): String = env.getOrElse(name, super.getenv(name))
override def clone: SparkConf = {
new SparkConfWithEnv(env).setAll(getAll)