aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala2
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala139
5 files changed, 79 insertions, 73 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
index b91be821f0..256a5a7c28 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
@@ -17,9 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object RecoveryState
- extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
-
+private[spark] object RecoveryState extends Enumeration {
type MasterState = Value
val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5f6407aadc..da18d45e65 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -553,7 +553,7 @@ abstract class RDD[T: ClassTag](
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
@@ -563,7 +563,7 @@ abstract class RDD[T: ClassTag](
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 67a7f87a5c..7b41ef89f1 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -55,8 +55,7 @@ class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, clea
}
}
-object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask",
- "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") {
+object MetadataCleanerType extends Enumeration {
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index cb76275e39..b647e8a672 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -39,7 +39,7 @@ class BlockIdSuite extends FunSuite {
fail()
} catch {
case e: IllegalStateException => // OK
- case _ => fail()
+ case _: Throwable => fail()
}
}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index c230a03298..daaa2a0305 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -79,23 +79,25 @@ class ReplSuite extends FunSuite {
}
test("simple foreach with accumulator") {
- val output = runInterpreter("local", """
- |val accum = sc.accumulator(0)
- |sc.parallelize(1 to 10).foreach(x => accum += x)
- |accum.value
- """.stripMargin)
+ val output = runInterpreter("local",
+ """
+ |val accum = sc.accumulator(0)
+ |sc.parallelize(1 to 10).foreach(x => accum += x)
+ |accum.value
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res1: Int = 55", output)
}
test("external vars") {
- val output = runInterpreter("local", """
- |var v = 7
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
- |v = 10
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
- """.stripMargin)
+ val output = runInterpreter("local",
+ """
+ |var v = 7
+ |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Int = 70", output)
@@ -103,35 +105,38 @@ class ReplSuite extends FunSuite {
}
test("external classes") {
- val output = runInterpreter("local", """
- |class C {
- |def foo = 5
- |}
- |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
- """.stripMargin)
+ val output = runInterpreter("local",
+ """
+ |class C {
+ |def foo = 5
+ |}
+ |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Int = 50", output)
}
test("external functions") {
- val output = runInterpreter("local", """
- |def double(x: Int) = x + x
- |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
- """.stripMargin)
+ val output = runInterpreter("local",
+ """
+ |def double(x: Int) = x + x
+ |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Int = 110", output)
}
test("external functions that access vars") {
- val output = runInterpreter("local", """
- |var v = 7
- |def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
- |v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
- """.stripMargin)
+ val output = runInterpreter("local",
+ """
+ |var v = 7
+ |def getV() = v
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Int = 70", output)
@@ -142,13 +147,14 @@ class ReplSuite extends FunSuite {
// Test that the value that a broadcast var had when it was created is used,
// even if that variable is then modified in the driver program
// TODO: This doesn't actually work for arrays when we run in local mode!
- val output = runInterpreter("local", """
- |var array = new Array[Int](5)
- |val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
- |array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
- """.stripMargin)
+ val output = runInterpreter("local",
+ """
+ |var array = new Array[Int](5)
+ |val broadcastArray = sc.broadcast(array)
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |array(0) = 5
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
@@ -162,12 +168,13 @@ class ReplSuite extends FunSuite {
out.write("What's up?\n")
out.write("Goodbye\n")
out.close()
- val output = runInterpreter("local", """
- |var file = sc.textFile("%s/input").cache()
- |file.count()
- |file.count()
- |file.count()
- """.stripMargin.format(tempDir.getAbsolutePath))
+ val output = runInterpreter("local",
+ """
+ |var file = sc.textFile("%s/input").cache()
+ |file.count()
+ |file.count()
+ |file.count()
+ """.stripMargin.format(tempDir.getAbsolutePath))
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Long = 3", output)
@@ -176,18 +183,19 @@ class ReplSuite extends FunSuite {
}
test("local-cluster mode") {
- val output = runInterpreter("local-cluster[1,1,512]", """
- |var v = 7
- |def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
- |v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
- |var array = new Array[Int](5)
- |val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
- |array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
- """.stripMargin)
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |var v = 7
+ |def getV() = v
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |var array = new Array[Int](5)
+ |val broadcastArray = sc.broadcast(array)
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |array(0) = 5
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Int = 70", output)
@@ -198,18 +206,19 @@ class ReplSuite extends FunSuite {
if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
test("running on Mesos") {
- val output = runInterpreter("localquiet", """
- |var v = 7
- |def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
- |v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
- |var array = new Array[Int](5)
- |val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
- |array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
- """.stripMargin)
+ val output = runInterpreter("localquiet",
+ """
+ |var v = 7
+ |def getV() = v
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |var array = new Array[Int](5)
+ |val broadcastArray = sc.broadcast(array)
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |array(0) = 5
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ """.stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res0: Int = 70", output)