aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-12 20:50:49 -0700
committerReynold Xin <rxin@databricks.com>2015-04-12 20:50:49 -0700
commita1fe59dae50f551d02dd18676308eca054ff6b07 (patch)
treee7f696a3c714923548fff2c3638dbdd710845ac9 /core
parent04bcd67cfc50f847559a9ff59a31aa93028b3628 (diff)
downloadspark-a1fe59dae50f551d02dd18676308eca054ff6b07.tar.gz
spark-a1fe59dae50f551d02dd18676308eca054ff6b07.tar.bz2
spark-a1fe59dae50f551d02dd18676308eca054ff6b07.zip
[SPARK-6765] Fix test code style for core.
Author: Reynold Xin <rxin@databricks.com> Closes #5484 from rxin/test-style-core and squashes the following commits: e0b0100 [Reynold Xin] [SPARK-6765] Fix test code style for core.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala41
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala40
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala103
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/util/VectorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala2
59 files changed, 386 insertions, 304 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index bd0f8bdefa..75399461f2 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -27,19 +27,20 @@ import org.scalatest.Matchers
class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
- implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
- def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
- t1 ++= t2
- t1
- }
- def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
- t1 += t2
- t1
- }
- def zero(t: mutable.Set[A]) : mutable.Set[A] = {
- new mutable.HashSet[A]()
+ implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
+ new AccumulableParam[mutable.Set[A], A] {
+ def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
+ t1 ++= t2
+ t1
+ }
+ def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
+ t1 += t2
+ t1
+ }
+ def zero(t: mutable.Set[A]) : mutable.Set[A] = {
+ new mutable.HashSet[A]()
+ }
}
- }
test ("basic accumulation"){
sc = new SparkContext("local", "test")
@@ -49,11 +50,10 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
d.foreach{x => acc += x}
acc.value should be (210)
-
- val longAcc = sc.accumulator(0l)
+ val longAcc = sc.accumulator(0L)
val maxInt = Integer.MAX_VALUE.toLong
d.foreach{x => longAcc += maxInt + x}
- longAcc.value should be (210l + maxInt * 20)
+ longAcc.value should be (210L + maxInt * 20)
}
test ("value not assignable from tasks") {
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4b25c200a6..70529d9216 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -45,16 +45,17 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
rdd = new RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array(split)
override val getDependencies = List[Dependency[_]]()
- override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
+ override def compute(split: Partition, context: TaskContext): Iterator[Int] =
+ Array(1, 2, 3, 4).iterator
}
rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
override def getPartitions: Array[Partition] = firstParent[Int].partitions
- override def compute(split: Partition, context: TaskContext) =
+ override def compute(split: Partition, context: TaskContext): Iterator[Int] =
firstParent[Int].iterator(split, context)
}.cache()
rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
override def getPartitions: Array[Partition] = firstParent[Int].partitions
- override def compute(split: Partition, context: TaskContext) =
+ override def compute(split: Partition, context: TaskContext): Iterator[Int] =
firstParent[Int].iterator(split, context)
}.cache()
}
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 32abc65385..e1faddeabe 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -75,7 +75,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
assert(parCollection.dependencies != Nil)
assert(parCollection.partitions.length === numPartitions)
- assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
+ assert(parCollection.partitions.toList ===
+ parCollection.checkpointData.get.getPartitions.toList)
assert(parCollection.collect() === result)
}
@@ -102,13 +103,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
test("UnionRDD") {
- def otherRDD = sc.makeRDD(1 to 10, 1)
+ def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
testRDD(_.union(otherRDD))
testRDDPartitions(_.union(otherRDD))
}
test("CartesianRDD") {
- def otherRDD = sc.makeRDD(1 to 10, 1)
+ def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
testRDD(new CartesianRDD(sc, _, otherRDD))
testRDDPartitions(new CartesianRDD(sc, _, otherRDD))
@@ -223,7 +224,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
val partitionAfterCheckpoint = serializeDeserialize(
unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
assert(
- partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass,
+ partitionBeforeCheckpoint.parents.head.getClass !=
+ partitionAfterCheckpoint.parents.head.getClass,
"PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed"
)
}
@@ -358,7 +360,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* Generate an pair RDD (with partitioner) such that both the RDD and its partitions
* have large size.
*/
- def generateFatPairRDD() = {
+ def generateFatPairRDD(): RDD[(Int, Int)] = {
new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
}
@@ -445,7 +447,8 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
- def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
+ def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
+ : RDD[(K, Array[Iterable[V]])] = {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index cdfaacee7d..1de169d964 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -64,7 +64,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
}
}
- //------ Helper functions ------
+ // ------ Helper functions ------
protected def newRDD() = sc.makeRDD(1 to 10)
protected def newPairRDD() = newRDD().map(_ -> 1)
@@ -370,7 +370,7 @@ class CleanerTester(
val cleanerListener = new CleanerListener {
def rddCleaned(rddId: Int): Unit = {
toBeCleanedRDDIds -= rddId
- logInfo("RDD "+ rddId + " cleaned")
+ logInfo("RDD " + rddId + " cleaned")
}
def shuffleCleaned(shuffleId: Int): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 7acd27c735..c8f08eed47 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -222,7 +222,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsSequenceFile(outputDir)
val output =
- sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
+ sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}
@@ -451,7 +451,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
- val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ val randomRDD = sc.parallelize(
+ Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
}
@@ -459,8 +460,10 @@ class FileSuite extends FunSuite with LocalSparkContext {
test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
- val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
- randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ val randomRDD = sc.parallelize(
+ Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
+ tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
@@ -471,16 +474,20 @@ class FileSuite extends FunSuite with LocalSparkContext {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
- val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
- randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ val randomRDD = sc.parallelize(
+ Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
+ tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
- randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
+ tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
}
test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
- val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ val randomRDD = sc.parallelize(
+ Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
@@ -492,7 +499,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
- val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ val randomRDD = sc.parallelize(
+ Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
index d895230ecf..51348c039b 100644
--- a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
@@ -51,7 +51,7 @@ private object ImplicitOrderingSuite {
override def compare(o: OrderedClass): Int = ???
}
- def basicMapExpectations(rdd: RDD[Int]) = {
+ def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
List((rdd.map(x => (x, x)).keyOrdering.isDefined,
"rdd.map(x => (x, x)).keyOrdering.isDefined"),
(rdd.map(x => (1, x)).keyOrdering.isDefined,
@@ -68,7 +68,7 @@ private object ImplicitOrderingSuite {
"rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined"))
}
- def otherRDDMethodExpectations(rdd: RDD[Int]) = {
+ def otherRDDMethodExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
List((rdd.groupBy(x => x).keyOrdering.isDefined,
"rdd.groupBy(x => x).keyOrdering.isDefined"),
(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty,
@@ -82,4 +82,4 @@ private object ImplicitOrderingSuite {
(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined,
"rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined"))
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 21487bc24d..4d3e09793f 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -188,7 +188,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
val rdd = sc.parallelize(1 to 10, 2).map { i =>
JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
(i, i)
- }.reduceByKey(_+_)
+ }.reduceByKey(_ + _)
val f1 = rdd.collectAsync()
val f2 = rdd.countAsync()
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 53e367a617..8bf2e55def 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -37,7 +37,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
super.afterEach()
}
- def resetSparkContext() = {
+ def resetSparkContext(): Unit = {
LocalSparkContext.stop(sc)
sc = null
}
@@ -54,7 +54,7 @@ object LocalSparkContext {
}
/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
- def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
+ def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = {
try {
f(sc)
} finally {
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index b7532314ad..47e3bf6e1a 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -92,7 +92,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
test("RangePartitioner for keys that are not Comparable (but with Ordering)") {
// Row does not extend Comparable, but has an implicit Ordering defined.
implicit object RowOrdering extends Ordering[Row] {
- override def compare(x: Row, y: Row) = x.value - y.value
+ override def compare(x: Row, y: Row): Int = x.value - y.value
}
val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x)))
@@ -212,20 +212,24 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
val arrPairs: RDD[(Array[Int], Int)] =
sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
- assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
+ def verify(testFun: => Unit): Unit = {
+ intercept[SparkException](testFun).getMessage.contains("array")
+ }
+
+ verify(arrs.distinct())
// We can't catch all usages of arrays, since they might occur inside other collections:
// assert(fails { arrPairs.distinct() })
- assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.cogroup(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
+ verify(arrPairs.partitionBy(new HashPartitioner(2)))
+ verify(arrPairs.join(arrPairs))
+ verify(arrPairs.leftOuterJoin(arrPairs))
+ verify(arrPairs.rightOuterJoin(arrPairs))
+ verify(arrPairs.fullOuterJoin(arrPairs))
+ verify(arrPairs.groupByKey())
+ verify(arrPairs.countByKey())
+ verify(arrPairs.countByKeyApprox(1))
+ verify(arrPairs.cogroup(arrPairs))
+ verify(arrPairs.reduceByKeyLocally(_ + _))
+ verify(arrPairs.reduceByKey(_ + _))
}
test("zero-length partitions should be correctly handled") {
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
index 444a33371b..93f46ef11c 100644
--- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -36,7 +36,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
- conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+ conf.set("spark.ssl.enabledAlgorithms",
+ "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")
val opts = SSLOptions.parse(conf, "spark.ssl")
@@ -52,7 +53,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
assert(opts.protocol === Some("SSLv3"))
- assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+ assert(opts.enabledAlgorithms ===
+ Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
}
test("test resolving property with defaults specified ") {
@@ -66,7 +68,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
- conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+ conf.set("spark.ssl.enabledAlgorithms",
+ "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")
val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
@@ -83,7 +86,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
assert(opts.protocol === Some("SSLv3"))
- assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+ assert(opts.enabledAlgorithms ===
+ Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
}
test("test whether defaults can be overridden ") {
@@ -99,7 +103,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
- conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+ conf.set("spark.ssl.enabledAlgorithms",
+ "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
conf.set("spark.ssl.protocol", "SSLv3")
diff --git a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
index ace8123a89..308b9ea177 100644
--- a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
+++ b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
@@ -21,10 +21,11 @@ import java.io.File
object SSLSampleConfigs {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
- val untrustedKeyStorePath = new File(this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
+ val untrustedKeyStorePath = new File(
+ this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
- def sparkSSLConfig() = {
+ def sparkSSLConfig(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
@@ -38,7 +39,7 @@ object SSLSampleConfigs {
conf
}
- def sparkSSLConfigUntrusted() = {
+ def sparkSSLConfigUntrusted(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", untrustedKeyStorePath)
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 30b6184c77..d718051602 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -142,7 +142,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("shuffle on mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
val results = new ShuffledRDD[Int, Int, Int](pairs,
@@ -155,7 +155,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
// This is not in SortingSuite because of the local cluster setup.
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
@@ -169,7 +169,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("cogroup using mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
@@ -196,7 +196,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
test("subtract mutable pairs") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index c7301a30d8..94be1c6d63 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -114,11 +114,13 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
if (length1 != gotten1.length()) {
throw new SparkException(
- s"file has different length $length1 than added file ${gotten1.length()} : " + absolutePath1)
+ s"file has different length $length1 than added file ${gotten1.length()} : " +
+ absolutePath1)
}
if (length2 != gotten2.length()) {
throw new SparkException(
- s"file has different length $length2 than added file ${gotten2.length()} : " + absolutePath2)
+ s"file has different length $length2 than added file ${gotten2.length()} : " +
+ absolutePath2)
}
if (absolutePath1 == gotten1.getAbsolutePath) {
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index 41d6ea29d5..084eb237d7 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -82,7 +82,8 @@ class StatusTrackerSuite extends FunSuite with Matchers with LocalSparkContext {
secondJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
- sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
+ sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (
+ Set(firstJobId, secondJobId))
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index af3272692d..c8fdfa6939 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -33,7 +33,7 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
val broadcast = rdd.context.broadcast(list)
val bid = broadcast.id
- def doSomething() = {
+ def doSomething(): Set[(Int, Boolean)] = {
rdd.map { x =>
val bm = SparkEnv.get.blockManager
// Check if broadcast block was fetched
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 68b5776fc6..2071701b31 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -100,13 +100,13 @@ class JsonProtocolSuite extends FunSuite {
appInfo
}
- def createDriverCommand() = new Command(
+ def createDriverCommand(): Command = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)
- def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
- false, createDriverCommand())
+ def createDriverDesc(): DriverDescription =
+ new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 54dd7c9c45..9cdb42814c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -56,7 +56,7 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
val SPARK_PUBLIC_DNS = "public_dns"
class MySparkConf extends SparkConf(false) {
- override def getenv(name: String) = {
+ override def getenv(name: String): String = {
if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
else super.getenv(name)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 3a9963a5ce..20de46fdab 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -42,10 +42,10 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())
val page = new HistoryPage(historyServer)
- //when
+ // when
val response = page.render(request)
- //then
+ // then
val links = response \\ "a"
val justHrefs = for {
l <- links
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 2fa90e3bd1..8e09976636 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -508,7 +508,7 @@ private class DummyMaster(
exception: Option[Exception] = None)
extends Actor {
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case RequestSubmitDriver(driverDesc) =>
sender ! SubmitDriverResponse(success = true, Some(submitId), submitMessage)
case RequestKillDriver(driverId) =>
@@ -531,7 +531,7 @@ private class SmarterMaster extends Actor {
private var counter: Int = 0
private val submittedDrivers = new mutable.HashMap[String, DriverState]
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case RequestSubmitDriver(driverDesc) =>
val driverId = s"driver-$counter"
submittedDrivers(driverId) = RUNNING
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
index 1d64ec201e..61071ee172 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -129,7 +129,8 @@ class SubmitRestProtocolSuite extends FunSuite {
assert(newMessage.sparkProperties("spark.files") === "fireball.png")
assert(newMessage.sparkProperties("spark.driver.memory") === "512m")
assert(newMessage.sparkProperties("spark.driver.cores") === "180")
- assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") === " -Dslices=5 -Dcolor=mostly_red")
+ assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") ===
+ " -Dslices=5 -Dcolor=mostly_red")
assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar")
assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar")
assert(newMessage.sparkProperties("spark.driver.supervise") === "false")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 6fca6321e5..a8b9df227c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -35,7 +35,8 @@ class ExecutorRunnerTest extends FunSuite {
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
"publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
- val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
+ val builder = CommandUtils.buildProcessBuilder(
+ appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
index 372d7aa453..7cc2104281 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -37,7 +37,7 @@ class WorkerArgumentsTest extends FunSuite {
val args = Array("spark://localhost:0000 ")
class MySparkConf extends SparkConf(false) {
- override def getenv(name: String) = {
+ override def getenv(name: String): String = {
if (name == "SPARK_WORKER_MEMORY") "50000"
else super.getenv(name)
}
@@ -56,7 +56,7 @@ class WorkerArgumentsTest extends FunSuite {
val args = Array("spark://localhost:0000 ")
class MySparkConf extends SparkConf(false) {
- override def getenv(name: String) = {
+ override def getenv(name: String): String = {
if (name == "SPARK_WORKER_MEMORY") "5G"
else super.getenv(name)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 84e2fd7ad9..450fba21f4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -24,8 +24,10 @@ import org.scalatest.{Matchers, FunSuite}
class WorkerSuite extends FunSuite with Matchers {
- def cmd(javaOpts: String*) = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
- def conf(opts: (String, String)*) = new SparkConf(loadDefaults = false).setAll(opts)
+ def cmd(javaOpts: String*): Command = {
+ Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
+ }
+ def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)
test("test isUseLocalNodeSSLConfig") {
Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 78fa98a3b9..190b08d950 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -238,7 +238,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
sc.textFile(tmpFilePath, 4)
.map(key => (key, 1))
- .reduceByKey(_+_)
+ .reduceByKey(_ + _)
.saveAsTextFile("file://" + tmpFile.getAbsolutePath)
sc.listenerBus.waitUntilEmpty(500)
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index 37e528435a..100ac77dec 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -35,7 +35,8 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val property = conf.getInstance("random")
assert(property.size() === 2)
- assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(property.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
@@ -47,16 +48,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
- assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
- assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("source.jvm.class") ===
+ "org.apache.spark.metrics.source.JvmSource")
+ assert(masterProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
- assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
- assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("source.jvm.class") ===
+ "org.apache.spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
index 0dc59888f7..be8467354b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
@@ -80,7 +80,7 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
(r: ResultSet) => { r.getInt(1) } ).cache()
assert(rdd.count === 100)
- assert(rdd.reduce(_+_) === 10100)
+ assert(rdd.reduce(_ + _) === 10100)
}
test("large id overflow") {
@@ -92,7 +92,7 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
1131544775L, 567279358897692673L, 20,
(r: ResultSet) => { r.getInt(1) } ).cache()
assert(rdd.count === 100)
- assert(rdd.reduce(_+_) === 5050)
+ assert(rdd.reduce(_ + _) === 5050)
}
after {
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 108f70af43..ca0d953d30 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -168,13 +168,13 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("reduceByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.reduceByKey(_+_).collect()
+ val sums = pairs.reduceByKey(_ + _).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
test("reduceByKey with collectAsMap") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.reduceByKey(_+_).collectAsMap()
+ val sums = pairs.reduceByKey(_ + _).collectAsMap()
assert(sums.size === 2)
assert(sums(1) === 7)
assert(sums(2) === 1)
@@ -182,7 +182,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("reduceByKey with many output partitons") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.reduceByKey(_+_, 10).collect()
+ val sums = pairs.reduceByKey(_ + _, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
@@ -192,7 +192,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
def getPartition(key: Any) = key.asInstanceOf[Int]
}
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
- val sums = pairs.reduceByKey(_+_)
+ val sums = pairs.reduceByKey(_ + _)
assert(sums.collect().toSet === Set((1, 4), (0, 1)))
assert(sums.partitioner === Some(p))
// count the dependencies to make sure there is only 1 ShuffledRDD
@@ -208,7 +208,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
}
test("countApproxDistinctByKey") {
- def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+ def error(est: Long, size: Long): Double = math.abs(est - size) / size.toDouble
/* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
* only a statistical bound, the tests can fail for large values of relativeSD. We will be using
@@ -465,7 +465,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("foldByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
- val sums = pairs.foldByKey(0)(_+_).collect()
+ val sums = pairs.foldByKey(0)(_ + _).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
@@ -505,7 +505,8 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
conf.setOutputCommitter(classOf[FakeOutputCommitter])
FakeOutputCommitter.ran = false
- pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
+ pairs.saveAsHadoopFile(
+ "ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
}
@@ -552,7 +553,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
}
private object StratifiedAuxiliary {
- def stratifier (fractionPositive: Double) = {
+ def stratifier (fractionPositive: Double): (Int) => String = {
(x: Int) => if (x % 10 < (10 * fractionPositive).toInt) "1" else "0"
}
@@ -572,7 +573,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
def testSampleExact(stratifiedData: RDD[(String, Int)],
samplingRate: Double,
seed: Long,
- n: Long) = {
+ n: Long): Unit = {
testBernoulli(stratifiedData, true, samplingRate, seed, n)
testPoisson(stratifiedData, true, samplingRate, seed, n)
}
@@ -580,7 +581,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
def testSample(stratifiedData: RDD[(String, Int)],
samplingRate: Double,
seed: Long,
- n: Long) = {
+ n: Long): Unit = {
testBernoulli(stratifiedData, false, samplingRate, seed, n)
testPoisson(stratifiedData, false, samplingRate, seed, n)
}
@@ -590,7 +591,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
exact: Boolean,
samplingRate: Double,
seed: Long,
- n: Long) = {
+ n: Long): Unit = {
val expectedSampleSize = stratifiedData.countByKey()
.mapValues(count => math.ceil(count * samplingRate).toInt)
val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
@@ -612,7 +613,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
exact: Boolean,
samplingRate: Double,
seed: Long,
- n: Long) = {
+ n: Long): Unit = {
val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
math.ceil(count * samplingRate).toInt)
val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
@@ -701,27 +702,27 @@ class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
*/
class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
- def close(p1: NewTaskAttempContext) = ()
+ def close(p1: NewTaskAttempContext): Unit = ()
- def write(p1: Integer, p2: Integer) = ()
+ def write(p1: Integer, p2: Integer): Unit = ()
}
class NewFakeCommitter extends NewOutputCommitter {
- def setupJob(p1: NewJobContext) = ()
+ def setupJob(p1: NewJobContext): Unit = ()
def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false
- def setupTask(p1: NewTaskAttempContext) = ()
+ def setupTask(p1: NewTaskAttempContext): Unit = ()
- def commitTask(p1: NewTaskAttempContext) = ()
+ def commitTask(p1: NewTaskAttempContext): Unit = ()
- def abortTask(p1: NewTaskAttempContext) = ()
+ def abortTask(p1: NewTaskAttempContext): Unit = ()
}
class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
- def checkOutputSpecs(p1: NewJobContext) = ()
+ def checkOutputSpecs(p1: NewJobContext): Unit = ()
def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
new NewFakeWriter()
@@ -735,7 +736,7 @@ class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
class ConfigTestFormat() extends NewFakeFormat() with Configurable {
var setConfCalled = false
- def setConf(p1: Configuration) = {
+ def setConf(p1: Configuration): Unit = {
setConfCalled = true
()
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index cd193ae4f5..1880364581 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -100,7 +100,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = 1 until 100
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 99)
+ assert(slices.map(_.size).reduceLeft(_ + _) === 99)
assert(slices.forall(_.isInstanceOf[Range]))
}
@@ -108,7 +108,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = 1 to 100
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 100)
+ assert(slices.map(_.size).reduceLeft(_ + _) === 100)
assert(slices.forall(_.isInstanceOf[Range]))
}
@@ -139,7 +139,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(i).isInstanceOf[Range])
val range = slices(i).asInstanceOf[Range]
assert(range.start === i * (N / 40), "slice " + i + " start")
- assert(range.end === (i+1) * (N / 40), "slice " + i + " end")
+ assert(range.end === (i + 1) * (N / 40), "slice " + i + " end")
assert(range.step === 1, "slice " + i + " step")
}
}
@@ -156,7 +156,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val slices = ParallelCollectionRDD.slice(d, n)
("n slices" |: slices.size == n) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
- ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
+ ("equal sizes" |: slices.map(_.size).forall(x => x == d.size / n || x == d.size /n + 1))
}
check(prop)
}
@@ -174,7 +174,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
("n slices" |: slices.size == n) &&
("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
- ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
+ ("equal sizes" |: slices.map(_.size).forall(x => x == d.size / n || x == d.size / n + 1))
}
check(prop)
}
@@ -192,7 +192,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
("n slices" |: slices.size == n) &&
("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
- ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
+ ("equal sizes" |: slices.map(_.size).forall(x => x == d.size / n || x == d.size / n + 1))
}
check(prop)
}
@@ -201,7 +201,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = 1L until 100L
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 99)
+ assert(slices.map(_.size).reduceLeft(_ + _) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
@@ -209,7 +209,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = 1L to 100L
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 100)
+ assert(slices.map(_.size).reduceLeft(_ + _) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
@@ -217,7 +217,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = 1.0 until 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 99)
+ assert(slices.map(_.size).reduceLeft(_ + _) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
@@ -225,7 +225,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = 1.0 to 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
- assert(slices.map(_.size).reduceLeft(_+_) === 100)
+ assert(slices.map(_.size).reduceLeft(_ + _) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
index 8408d7e785..465068c6cb 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
@@ -23,7 +23,6 @@ import org.apache.spark.{Partition, SharedSparkContext, TaskContext}
class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
-
test("Pruned Partitions inherit locality prefs correctly") {
val rdd = new RDD[Int](sc, Nil) {
@@ -74,8 +73,6 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
}
class TestPartition(i: Int, value: Int) extends Partition with Serializable {
- def index = i
-
- def testValue = this.value
-
+ def index: Int = i
+ def testValue: Int = this.value
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
index a0483886f8..0d1369c19c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
@@ -35,7 +35,7 @@ class MockSampler extends RandomSampler[Long, Long] {
Iterator(s)
}
- override def clone = new MockSampler
+ override def clone: MockSampler = new MockSampler
}
class PartitionwiseSampledRDDSuite extends FunSuite with SharedSparkContext {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index bede1ffb3e..df42faab64 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -82,7 +82,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("countApproxDistinct") {
- def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+ def error(est: Long, size: Long): Double = math.abs(est - size) / size.toDouble
val size = 1000
val uniformDistro = for (i <- 1 to 5000) yield i % size
@@ -100,7 +100,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
test("partitioner aware union") {
- def makeRDDWithPartitioner(seq: Seq[Int]) = {
+ def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
sc.makeRDD(seq, 1)
.map(x => (x, null))
.partitionBy(new HashPartitioner(2))
@@ -159,8 +159,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("treeAggregate") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
- def seqOp = (c: Long, x: Int) => c + x
- def combOp = (c1: Long, c2: Long) => c1 + c2
+ def seqOp: (Long, Int) => Long = (c: Long, x: Int) => c + x
+ def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2
for (depth <- 1 until 10) {
val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
assert(sum === -1000L)
@@ -204,7 +204,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(empty.collect().size === 0)
val thrown = intercept[UnsupportedOperationException]{
- empty.reduce(_+_)
+ empty.reduce(_ + _)
}
assert(thrown.getMessage.contains("empty"))
@@ -321,7 +321,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
- val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
+ val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i + 2)).map{ j => "m" + (j%6)})))
val coalesced1 = data.coalesce(3)
assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
@@ -921,15 +921,17 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("task serialization exception should not hang scheduler") {
class BadSerializable extends Serializable {
@throws(classOf[IOException])
- private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization")
+ private def writeObject(out: ObjectOutputStream): Unit =
+ throw new KryoException("Bad serialization")
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {}
}
- // Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were
- // more threads in the Spark Context than there were number of objects in this sequence.
+ // Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if
+ // there were more threads in the Spark Context than there were number of objects in this
+ // sequence.
intercept[Throwable] {
- sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect
+ sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect()
}
// Check that the context has not crashed
sc.parallelize(1 to 100).map(x => x*2).collect
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala
index 4762fc1785..fe695d85e2 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala
@@ -21,11 +21,11 @@ object RDDSuiteUtils {
case class Person(first: String, last: String, age: Int)
object AgeOrdering extends Ordering[Person] {
- def compare(a:Person, b:Person) = a.age compare b.age
+ def compare(a:Person, b:Person): Int = a.age.compare(b.age)
}
object NameOrdering extends Ordering[Person] {
- def compare(a:Person, b:Person) =
+ def compare(a:Person, b:Person): Int =
implicitly[Ordering[Tuple2[String,String]]].compare((a.last, a.first), (b.last, b.first))
}
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 5a734ec5ba..ada07ef11c 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -70,7 +70,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
env.setupEndpoint("send-remotely", new RpcEndpoint {
override val rpcEnv = env
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case msg: String => message = msg
}
})
@@ -109,7 +109,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val rpcEndpointRef = env.setupEndpoint("ask-locally", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case msg: String => {
context.reply(msg)
}
@@ -123,7 +123,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
env.setupEndpoint("ask-remotely", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case msg: String => {
context.reply(msg)
}
@@ -146,7 +146,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
env.setupEndpoint("ask-timeout", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case msg: String => {
Thread.sleep(100)
context.reply(msg)
@@ -182,7 +182,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
calledMethods += "start"
}
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case msg: String =>
}
@@ -206,7 +206,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
throw new RuntimeException("Oops!")
}
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case m =>
}
@@ -225,7 +225,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint("onError-onStop", new RpcEndpoint {
override val rpcEnv = env
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case m =>
}
@@ -250,8 +250,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint("onError-receive", new RpcEndpoint {
override val rpcEnv = env
- override def receive = {
- case m => throw new RuntimeException("Oops!")
+ override def receive: PartialFunction[Any, Unit] = {
+ case m => throw new RuntimeException("Oops!")
}
override def onError(cause: Throwable): Unit = {
@@ -277,7 +277,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
callSelfSuccessfully = true
}
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case m =>
}
})
@@ -294,7 +294,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint("self-receive", new RpcEndpoint {
override val rpcEnv = env
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case m => {
self
callSelfSuccessfully = true
@@ -316,7 +316,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint("self-onStop", new RpcEndpoint {
override val rpcEnv = env
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case m =>
}
@@ -343,7 +343,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint(s"receive-in-sequence-$i", new ThreadSafeRpcEndpoint {
override val rpcEnv = env
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case m => result += 1
}
@@ -372,7 +372,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint("stop-reentrant", new RpcEndpoint {
override val rpcEnv = env
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case m =>
}
@@ -394,7 +394,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint("sendWithReply", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case m => context.reply("ack")
}
})
@@ -410,7 +410,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
env.setupEndpoint("sendWithReply-remotely", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case m => context.reply("ack")
}
})
@@ -432,7 +432,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
val endpointRef = env.setupEndpoint("sendWithReply-error", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case m => context.sendFailure(new SparkException("Oops"))
}
})
@@ -450,7 +450,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
env.setupEndpoint("sendWithReply-remotely-error", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case msg: String => context.sendFailure(new SparkException("Oops"))
}
})
@@ -476,7 +476,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
env.setupEndpoint("network-events", new ThreadSafeRpcEndpoint {
override val rpcEnv = env
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case "hello" =>
case m => events += "receive" -> m
}
@@ -519,7 +519,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
env.setupEndpoint("sendWithReply-unserializable-error", new RpcEndpoint {
override val rpcEnv = env
- override def receiveAndReply(context: RpcCallContext) = {
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case msg: String => context.sendFailure(new UnserializableException)
}
})
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index eb759f0807..3c52a8c446 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -57,20 +57,18 @@ class MyRDD(
locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
- override def getPartitions = (0 until numPartitions).map(i => new Partition {
- override def index = i
+ override def getPartitions: Array[Partition] = (0 until numPartitions).map(i => new Partition {
+ override def index: Int = i
}).toArray
override def getPreferredLocations(split: Partition): Seq[String] =
- if (locations.isDefinedAt(split.index))
- locations(split.index)
- else
- Nil
+ if (locations.isDefinedAt(split.index)) locations(split.index) else Nil
override def toString: String = "DAGSchedulerSuiteRDD " + id
}
class DAGSchedulerSuiteDummyException extends Exception
-class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSparkContext with Timeouts {
+class DAGSchedulerSuite
+ extends FunSuiteLike with BeforeAndAfter with LocalSparkContext with Timeouts {
val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
@@ -209,7 +207,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
- runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(
+ taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
}
}
}
@@ -269,21 +268,23 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
submit(new MyRDD(sc, 1, Nil), Array(0))
complete(taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("local job") {
val rdd = new PairOfIntsRDD(sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
Array(42 -> 0).iterator
- override def getPartitions = Array( new Partition { override def index = 0 } )
- override def getPreferredLocations(split: Partition) = Nil
- override def toString = "DAGSchedulerSuite Local RDD"
+ override def getPartitions: Array[Partition] =
+ Array( new Partition { override def index: Int = 0 } )
+ override def getPreferredLocations(split: Partition): List[String] = Nil
+ override def toString: String = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
+ runEvent(
+ JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("local job oom") {
@@ -295,9 +296,10 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
override def toString = "DAGSchedulerSuite Local RDD"
}
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
+ runEvent(
+ JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
assert(results.size == 0)
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("run trivial job w/ dependency") {
@@ -306,7 +308,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("cache location preferences w/ dependency") {
@@ -319,7 +321,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
complete(taskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("regression test for getCacheLocs") {
@@ -335,7 +337,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
}
test("avoid exponential blowup when getting preferred locs list") {
- // Build up a complex dependency graph with repeated zip operations, without preferred locations.
+ // Build up a complex dependency graph with repeated zip operations, without preferred locations
var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
(1 to 30).foreach(_ => rdd = rdd.zip(rdd))
// getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
@@ -357,7 +359,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("trivial job failure") {
@@ -367,7 +369,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("trivial job cancellation") {
@@ -378,7 +380,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0))
assert(sparkListener.failedStages.size === 1)
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("job cancellation no-kill backend") {
@@ -387,18 +389,20 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
val noKillTaskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
- override def start() = {}
- override def stop() = {}
- override def submitTasks(taskSet: TaskSet) = {
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def submitTasks(taskSet: TaskSet): Unit = {
taskSets += taskSet
}
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
throw new UnsupportedOperationException
}
- override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
- override def defaultParallelism() = 2
- override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
- blockManagerId: BlockManagerId): Boolean = true
+ override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
+ override def defaultParallelism(): Int = 2
+ override def executorHeartbeatReceived(
+ execId: String,
+ taskMetrics: Array[(Long, TaskMetrics)],
+ blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}
val noKillScheduler = new DAGScheduler(
@@ -422,7 +426,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
// When the task set completes normally, state should be correctly updated.
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.isEmpty)
@@ -442,7 +446,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
complete(taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("run trivial shuffle with fetch failure") {
@@ -465,10 +469,11 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
// have the 2nd attempt pass
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
// we can see both result blocks now
- assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
+ assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
+ Array("hostA", "hostB"))
complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("trivial shuffle with multiple fetch failures") {
@@ -521,19 +526,23 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(newEpoch > oldEpoch)
val taskSet = taskSets(0)
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(
+ taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(
+ taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
// should be ignored for being too old
- runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(
+ taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
- runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(
+ taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("run shuffle with map stage failure") {
@@ -552,7 +561,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.toSet === Set(0))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
/**
@@ -586,7 +595,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
class FailureRecordingJobListener() extends JobListener {
var failureMessage: String = _
override def taskSucceeded(index: Int, result: Any) {}
- override def jobFailed(exception: Exception) = { failureMessage = exception.getMessage }
+ override def jobFailed(exception: Exception): Unit = { failureMessage = exception.getMessage }
}
val listener1 = new FailureRecordingJobListener()
val listener2 = new FailureRecordingJobListener()
@@ -606,7 +615,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("run trivial shuffle with out-of-band failure and retry") {
@@ -629,7 +638,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("recursive shuffle failures") {
@@ -658,7 +667,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(5), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("cached post-shuffle") {
@@ -690,7 +699,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
complete(taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
@@ -742,7 +751,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
}
test("accumulator not calculated for resubmitted result stage") {
- //just for register
+ // just for register
val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
val finalRdd = new MyRDD(sc, 1, Nil)
submit(finalRdd, Array(0))
@@ -754,7 +763,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(accVal === 1)
- assertDataStructuresEmpty
+ assertDataStructuresEmpty()
}
/**
@@ -774,7 +783,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
- private def assertDataStructuresEmpty = {
+ private def assertDataStructuresEmpty(): Unit = {
assert(scheduler.activeJobs.isEmpty)
assert(scheduler.failedStages.isEmpty)
assert(scheduler.jobIdToActiveJob.isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 30ee63e78d..6d25edb7d2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -268,7 +268,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
object EventLoggingListenerSuite {
/** Get a SparkConf with event logging enabled. */
- def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = {
+ def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = {
val conf = new SparkConf
conf.set("spark.eventLog.enabled", "true")
conf.set("spark.eventLog.testing", "true")
@@ -280,5 +280,5 @@ object EventLoggingListenerSuite {
conf
}
- def getUniqueApplicationId = "test-" + System.currentTimeMillis
+ def getUniqueApplicationId: String = "test-" + System.currentTimeMillis
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
index 6b75c98839..9b92f8de56 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
@@ -24,7 +24,9 @@ import org.apache.spark.TaskContext
/**
* A Task implementation that fails to serialize.
*/
-private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) {
+private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int)
+ extends Task[Array[Byte]](stageId, 0) {
+
override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte]
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 627c9a4ddf..825c616c0c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -85,7 +85,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
val stopperReturned = new Semaphore(0)
class BlockingListener extends SparkListener {
- override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
listenerStarted.release()
listenerWait.acquire()
drained = true
@@ -206,8 +206,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
sc.addSparkListener(new StatsReportListener)
// just to make sure some of the tasks take a noticeable amount of time
val w = { i: Int =>
- if (i == 0)
+ if (i == 0) {
Thread.sleep(100)
+ }
i
}
@@ -247,12 +248,12 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
*/
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
- taskMetrics.resultSize should be > (0l)
+ taskMetrics.resultSize should be > (0L)
if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
taskMetrics.inputMetrics should not be ('defined)
taskMetrics.outputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
- taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
+ taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0L)
}
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
taskMetrics.shuffleReadMetrics should be ('defined)
@@ -260,7 +261,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
sm.totalBlocksFetched should be (128)
sm.localBlocksFetched should be (128)
sm.remoteBlocksFetched should be (0)
- sm.remoteBytesRead should be (0l)
+ sm.remoteBytesRead should be (0L)
}
}
}
@@ -406,12 +407,12 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
val startedGettingResultTasks = new mutable.HashSet[Int]()
val endedTasks = new mutable.HashSet[Int]()
- override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
startedTasks += taskStart.taskInfo.index
notify()
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
endedTasks += taskEnd.taskInfo.index
notify()
}
@@ -425,7 +426,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
* A simple listener that throws an exception on job end.
*/
private class BadListener extends SparkListener {
- override def onJobEnd(jobEnd: SparkListenerJobEnd) = { throw new Exception }
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { throw new Exception }
}
}
@@ -438,10 +439,10 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
*/
private class BasicJobCounter extends SparkListener {
var count = 0
- override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+ override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}
private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
var count = 0
- override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+ override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index add13f5b21..ffa4381969 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.scheduler
-import java.util.Properties
-
import org.scalatest.FunSuite
import org.apache.spark._
@@ -27,7 +25,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
def start() {}
def stop() {}
def reviveOffers() {}
- def defaultParallelism() = 1
+ def defaultParallelism(): Int = 1
}
class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
@@ -115,7 +113,8 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
}
val numFreeCores = 1
taskScheduler.setDAGScheduler(dagScheduler)
- var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
+ val taskSet = new TaskSet(
+ Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
@@ -123,7 +122,8 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(0 === taskDescriptions.length)
// Now check that we can still submit tasks
- // Even if one of the tasks has not-serializable tasks, the other task set should still be processed without error
+ // Even if one of the tasks has not-serializable tasks, the other task set should
+ // still be processed without error
taskScheduler.submitTasks(taskSet)
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 12330d8f63..716d12c076 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -67,7 +67,7 @@ object FakeRackUtil {
hostToRack(host) = rack
}
- def getRackForHost(host: String) = {
+ def getRackForHost(host: String): Option[String] = {
hostToRack.get(host)
}
}
@@ -327,8 +327,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
- // After this, nothing should get chosen, because we have separated tasks with unavailable preference
- // from the noPrefPendingTasks
+ // After this, nothing should get chosen, because we have separated tasks with unavailable
+ // preference from the noPrefPendingTasks
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
// Now mark host2 as dead
@@ -499,7 +499,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sched.addExecutor("execC", "host2")
manager.executorAdded()
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
- assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
+ assert(manager.myLocalityLevels.sameElements(
+ Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
// test if the valid locality is recomputed when the executor is lost
sched.removeExecutor("execC")
manager.executorLost("execC", "host2")
@@ -569,7 +570,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
- val taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
+ val taskSet = new TaskSet(
+ Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
intercept[TaskNotSerializableException] {
@@ -582,7 +584,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
sc = new SparkContext("local", "test", conf)
- def genBytes(size: Int) = { (x: Int) =>
+ def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) =>
val bytes = Array.ofDim[Byte](size)
scala.util.Random.nextBytes(bytes)
bytes
@@ -605,7 +607,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ val sched = new FakeTaskScheduler(
+ sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2"), TaskLocation("host1")),
@@ -629,9 +632,11 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}
- test("node-local tasks should be scheduled right away when there are only node-local and no-preference tasks") {
+ test("node-local tasks should be scheduled right away " +
+ "when there are only node-local and no-preference tasks") {
sc = new SparkContext("local", "test")
- val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ val sched = new FakeTaskScheduler(
+ sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
@@ -650,7 +655,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
}
- test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") {
+ test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished")
+ {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(4,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index f1a4380d34..a311512e82 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -63,16 +63,18 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
// uri is null.
val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
- assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+ assert(executorInfo.getCommand.getValue ===
+ s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
// uri exists.
conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
- assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+ assert(executorInfo1.getCommand.getValue ===
+ s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
}
test("mesos resource offers result in launching tasks") {
- def createOffer(id: Int, mem: Int, cpu: Int) = {
+ def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
@@ -82,8 +84,10 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
- builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
+ builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
}
val driver = mock[SchedulerDriver]
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 6198df84fa..b070a54aa9 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -106,7 +106,9 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(mutable.HashMap(1 -> "one", 2 -> "two"))
check(mutable.HashMap("one" -> 1, "two" -> 2))
check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
- check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
+ check(List(
+ mutable.HashMap("one" -> 1, "two" -> 2),
+ mutable.HashMap(1->"one",2->"two",3->"three")))
}
test("ranges") {
@@ -169,7 +171,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("kryo with collect") {
val control = 1 :: 2 :: Nil
- val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)).collect().map(_.x)
+ val result = sc.parallelize(control, 2)
+ .map(new ClassWithoutNoArgConstructor(_))
+ .collect()
+ .map(_.x)
assert(control === result.toSeq)
}
@@ -237,7 +242,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
// Set a special, broken ClassLoader and make sure we get an exception on deserialization
ser.setDefaultClassLoader(new ClassLoader() {
- override def loadClass(name: String) = throw new UnsupportedOperationException
+ override def loadClass(name: String): Class[_] = throw new UnsupportedOperationException
})
intercept[UnsupportedOperationException] {
ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
@@ -287,14 +292,14 @@ object KryoTest {
class ClassWithNoArgConstructor {
var x: Int = 0
- override def equals(other: Any) = other match {
+ override def equals(other: Any): Boolean = other match {
case c: ClassWithNoArgConstructor => x == c.x
case _ => false
}
}
class ClassWithoutNoArgConstructor(val x: Int) {
- override def equals(other: Any) = other match {
+ override def equals(other: Any): Boolean = other match {
case c: ClassWithoutNoArgConstructor => x == c.x
case _ => false
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
index d037e2c19a..433fd6bb4a 100644
--- a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
@@ -24,14 +24,16 @@ import org.apache.spark.rdd.RDD
/* A trivial (but unserializable) container for trivial functions */
class UnserializableClass {
- def op[T](x: T) = x.toString
+ def op[T](x: T): String = x.toString
- def pred[T](x: T) = x.toString.length % 2 == 0
+ def pred[T](x: T): Boolean = x.toString.length % 2 == 0
}
class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContext {
- def fixture = (sc.parallelize(0 until 1000).map(_.toString), new UnserializableClass)
+ def fixture: (RDD[String], UnserializableClass) = {
+ (sc.parallelize(0 until 1000).map(_.toString), new UnserializableClass)
+ }
test("throws expected serialization exceptions on actions") {
val (data, uc) = fixture
diff --git a/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
index 0ade1bab18..963264cef3 100644
--- a/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
* A serializer implementation that always return a single element in a deserialization stream.
*/
class TestSerializer extends Serializer {
- override def newInstance() = new TestSerializerInstance
+ override def newInstance(): TestSerializerInstance = new TestSerializerInstance
}
@@ -36,7 +36,8 @@ class TestSerializerInstance extends SerializerInstance {
override def serializeStream(s: OutputStream): SerializationStream = ???
- override def deserializeStream(s: InputStream) = new TestDeserializationStream
+ override def deserializeStream(s: InputStream): TestDeserializationStream =
+ new TestDeserializationStream
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = ???
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index b834dc0e73..7d76435cd7 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -85,8 +85,8 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
// Now comes the test :
// Write to shuffle 3; and close it, but before registering it, check if the file lengths for
// previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
- // of block based on remaining data in file : which could mess things up when there is concurrent read
- // and writes happening to the same shuffle group.
+ // of block based on remaining data in file : which could mess things up when there is
+ // concurrent read and writes happening to the same shuffle group.
val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
new ShuffleWriteMetrics)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6dc5bc4cb0..545722b050 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -60,7 +60,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
- def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
+ def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
private def makeBlockManager(
maxMem: Long,
@@ -107,8 +107,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
test("StorageLevel object caching") {
val level1 = StorageLevel(false, false, false, false, 3)
- val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1
- val level3 = StorageLevel(false, false, false, false, 2) // this should return a different object
+ // this should return the same object as level1
+ val level2 = StorageLevel(false, false, false, false, 3)
+ // this should return a different object
+ val level3 = StorageLevel(false, false, false, false, 2)
assert(level2 === level1, "level2 is not same as level1")
assert(level2.eq(level1), "level2 is not the same object as level1")
assert(level3 != level1, "level3 is same as level1")
@@ -802,7 +804,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
// Create a non-trivial (not all zeros) byte array
var counter = 0.toByte
- def incr = {counter = (counter + 1).toByte; counter;}
+ def incr: Byte = {counter = (counter + 1).toByte; counter;}
val bytes = Array.fill[Byte](1000)(incr)
val byteBuffer = ByteBuffer.wrap(bytes)
@@ -956,8 +958,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
- assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
- assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
+ assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size
+ === 3)
+ assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size
+ === 1)
// insert some more blocks
store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
@@ -965,8 +969,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// getLocations and getBlockStatus should yield the same locations
- assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
- assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3)
+ assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size
+ === 1)
+ assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size
+ === 3)
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
@@ -1090,8 +1096,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
val memoryStore = store.memoryStore
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
- def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
- def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
// Unroll with plenty of space. This should succeed and cache both blocks.
@@ -1144,8 +1150,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
val diskStore = store.diskStore
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
- def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
- def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
store.putIterator("b1", smallIterator, memAndDisk)
@@ -1187,7 +1193,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
val memOnly = StorageLevel.MEMORY_ONLY
val memoryStore = store.memoryStore
val smallList = List.fill(40)(new Array[Byte](100))
- def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
// All unroll memory used is released because unrollSafely returned an array
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index 82a82e23ee..b47157f833 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -47,7 +47,7 @@ class LocalDirsSuite extends FunSuite with BeforeAndAfter {
assert(!new File("/NONEXISTENT_DIR").exists())
// SPARK_LOCAL_DIRS is a valid directory:
class MySparkConf extends SparkConf(false) {
- override def getenv(name: String) = {
+ override def getenv(name: String): String = {
if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
else super.getenv(name)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 0d155982a8..1cb594633f 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -137,7 +137,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
new SparkContext(conf)
}
- def hasKillLink = find(className("kill-link")).isDefined
+ def hasKillLink: Boolean = find(className("kill-link")).isDefined
def runSlowJob(sc: SparkContext) {
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index c0c28cb60e..21d8267114 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -269,7 +269,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0))
val execId = "exe-1"
- def makeTaskMetrics(base: Int) = {
+ def makeTaskMetrics(base: Int): TaskMetrics = {
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
val shuffleWriteMetrics = new ShuffleWriteMetrics()
@@ -291,7 +291,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics
}
- def makeTaskInfo(taskId: Long, finishTime: Int = 0) = {
+ def makeTaskInfo(taskId: Long, finishTime: Int = 0): TaskInfo = {
val taskInfo = new TaskInfo(taskId, 0, 1, 0L, execId, "host1", TaskLocality.NODE_LOCAL,
false)
taskInfo.finishTime = finishTime
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index e1bc1379b5..3744e479d2 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -107,7 +107,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val myRddInfo0 = rddInfo0
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
- val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
+ val stageInfo0 = new StageInfo(
+ 0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 3)
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 054ef54e74..c47162779b 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -83,7 +83,7 @@ object TestObject {
class TestClass extends Serializable {
var x = 5
- def getX = x
+ def getX: Int = x
def run(): Int = {
var nonSer = new NonSerializable
@@ -95,7 +95,7 @@ class TestClass extends Serializable {
}
class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
- def getX = x
+ def getX: Int = x
def run(): Int = {
var nonSer = new NonSerializable
@@ -164,7 +164,7 @@ object TestObjectWithNesting {
}
class TestClassWithNesting(val y: Int) extends Serializable {
- def getY = y
+ def getY: Int = y
def run(): Int = {
var nonSer = new NonSerializable
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 43b6a405cb..c05317534c 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -109,7 +109,8 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
// verify whether the earliest file has been deleted
val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted
- logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + rolledOverFiles.mkString("\n"))
+ logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" +
+ rolledOverFiles.mkString("\n"))
assert(rolledOverFiles.size > 2)
val earliestRolledOverFile = rolledOverFiles.head
val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles(
@@ -135,7 +136,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream)
val appender = FileAppender(testInputStream, testFile, conf)
- //assert(appender.getClass === classTag[ExpectedAppender].getClass)
+ // assert(appender.getClass === classTag[ExpectedAppender].getClass)
assert(appender.getClass.getSimpleName ===
classTag[ExpectedAppender].runtimeClass.getSimpleName)
if (appender.isInstanceOf[RollingFileAppender]) {
@@ -153,9 +154,11 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
import RollingFileAppender._
- def rollingStrategy(strategy: String) = Seq(STRATEGY_PROPERTY -> strategy)
- def rollingSize(size: String) = Seq(SIZE_PROPERTY -> size)
- def rollingInterval(interval: String) = Seq(INTERVAL_PROPERTY -> interval)
+ def rollingStrategy(strategy: String): Seq[(String, String)] =
+ Seq(STRATEGY_PROPERTY -> strategy)
+ def rollingSize(size: String): Seq[(String, String)] = Seq(SIZE_PROPERTY -> size)
+ def rollingInterval(interval: String): Seq[(String, String)] =
+ Seq(INTERVAL_PROPERTY -> interval)
val msInDay = 24 * 60 * 60 * 1000L
val msInHour = 60 * 60 * 1000L
diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
index 72e81f3f1a..403dcb03bd 100644
--- a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
@@ -71,7 +71,7 @@ class NextIteratorSuite extends FunSuite with Matchers {
class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] {
var closeCalled = 0
- override def getNext() = {
+ override def getNext(): Int = {
if (ints.size == 0) {
finished = true
0
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 7424c2e91d..67a9f75ff2 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -98,8 +98,10 @@ class SizeEstimatorSuite
// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
- assertResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
- assertResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
+ // 10 pointers plus 8-byte object
+ assertResult(72)(SizeEstimator.estimate(Array.fill(10)(d1)))
+ // 100 pointers plus 8-byte object
+ assertResult(432)(SizeEstimator.estimate(Array.fill(100)(d1)))
// Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4032 because it can't tell that *all* the elements will equal the first
diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
index c1c605cdb4..8b72fe665c 100644
--- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
@@ -63,7 +63,7 @@ class TimeStampedHashMapSuite extends FunSuite {
assert(map1.getTimestamp("k1").get < threshTime1)
assert(map1.getTimestamp("k2").isDefined)
assert(map1.getTimestamp("k2").get >= threshTime1)
- map1.clearOldValues(threshTime1) //should only clear k1
+ map1.clearOldValues(threshTime1) // should only clear k1
assert(map1.get("k1") === None)
assert(map1.get("k2").isDefined)
}
@@ -93,7 +93,7 @@ class TimeStampedHashMapSuite extends FunSuite {
assert(map1.getTimestamp("k1").get < threshTime1)
assert(map1.getTimestamp("k2").isDefined)
assert(map1.getTimestamp("k2").get >= threshTime1)
- map1.clearOldValues(threshTime1) //should only clear k1
+ map1.clearOldValues(threshTime1) // should only clear k1
assert(map1.get("k1") === None)
assert(map1.get("k2").isDefined)
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 5d93086082..449fb87f11 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -106,7 +106,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val second = 1000
val minute = second * 60
val hour = minute * 60
- def str = Utils.msDurationToString(_)
+ def str: (Long) => String = Utils.msDurationToString(_)
val sep = new DecimalFormatSymbols(Locale.getDefault()).getDecimalSeparator()
@@ -199,7 +199,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
- val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
+ // The parent directory has two child directories
+ val child1: File = Utils.createTempDir(parent.getCanonicalPath)
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
val child3: File = Utils.createTempDir(child1.getCanonicalPath)
// set the last modified time of child1 to 30 secs old
diff --git a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
index 794a55d617..ce2968728a 100644
--- a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.FunSuite
@deprecated("suppress compile time deprecation warning", "1.0.0")
class VectorSuite extends FunSuite {
- def verifyVector(vector: Vector, expectedLength: Int) = {
+ def verifyVector(vector: Vector, expectedLength: Int): Unit = {
assert(vector.length == expectedLength)
assert(vector.elements.min > 0.0)
assert(vector.elements.max < 1.0)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 48f79ea651..dff8f3ddc8 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -185,7 +185,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
// reduceByKey
val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
- val result1 = rdd.reduceByKey(_+_).collect()
+ val result1 = rdd.reduceByKey(_ + _).collect()
assert(result1.toSet === Set[(Int, Int)]((0, 5), (1, 5)))
// groupByKey
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 72d96798b1..9ff067f86a 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -553,10 +553,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
- def createCombiner(i: String) = ArrayBuffer[String](i)
- def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
- def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String]) =
- buffer1 ++= buffer2
+ def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
+ def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
+ def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String])
+ : ArrayBuffer[String] = buffer1 ++= buffer2
val agg = new Aggregator[String, String, ArrayBuffer[String]](
createCombiner _, mergeValue _, mergeCombiners _)
@@ -633,14 +633,17 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
- def createCombiner(i: Int) = ArrayBuffer[Int](i)
- def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
- def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
+ def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
+ def mergeValue(buffer: ArrayBuffer[Int], i: Int): ArrayBuffer[Int] = buffer += i
+ def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]): ArrayBuffer[Int] = {
+ buf1 ++= buf2
+ }
val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners)
val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None)
- sorter.insertAll((1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
+ sorter.insertAll(
+ (1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
val it = sorter.iterator
while (it.hasNext) {
@@ -654,9 +657,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
- def createCombiner(i: String) = ArrayBuffer[String](i)
- def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
- def mergeCombiners(buf1: ArrayBuffer[String], buf2: ArrayBuffer[String]) = buf1 ++= buf2
+ def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
+ def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
+ def mergeCombiners(buf1: ArrayBuffer[String], buf2: ArrayBuffer[String]): ArrayBuffer[String] =
+ buf1 ++= buf2
val agg = new Aggregator[String, String, ArrayBuffer[String]](
createCombiner, mergeValue, mergeCombiners)
@@ -720,7 +724,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
// Using wrongOrdering to show integer overflow introduced exception.
val rand = new Random(100L)
val wrongOrdering = new Ordering[String] {
- override def compare(a: String, b: String) = {
+ override def compare(a: String, b: String): Int = {
val h1 = if (a == null) 0 else a.hashCode()
val h2 = if (b == null) 0 else b.hashCode()
h1 - h2
@@ -742,9 +746,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
// Using aggregation and external spill to make sure ExternalSorter using
// partitionKeyComparator.
- def createCombiner(i: String) = ArrayBuffer(i)
- def mergeValue(c: ArrayBuffer[String], i: String) = c += i
- def mergeCombiners(c1: ArrayBuffer[String], c2: ArrayBuffer[String]) = c1 ++= c2
+ def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer(i)
+ def mergeValue(c: ArrayBuffer[String], i: String): ArrayBuffer[String] = c += i
+ def mergeCombiners(c1: ArrayBuffer[String], c2: ArrayBuffer[String]): ArrayBuffer[String] =
+ c1 ++= c2
val agg = new Aggregator[String, String, ArrayBuffer[String]](
createCombiner, mergeValue, mergeCombiners)
diff --git a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
index ef7178bcdf..03f5f2d1b8 100644
--- a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
@@ -28,7 +28,7 @@ import scala.language.reflectiveCalls
class XORShiftRandomSuite extends FunSuite with Matchers {
- def fixture = new {
+ def fixture: Object {val seed: Long; val hundMil: Int; val xorRand: XORShiftRandom} = new {
val seed = 1L
val xorRand = new XORShiftRandom(seed)
val hundMil = 1e8.toInt