aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala17
-rw-r--r--core/src/test/scala/spark/BroadcastSuite.scala18
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala21
-rw-r--r--core/src/test/scala/spark/FileSuite.scala42
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala3
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala24
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala19
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala18
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala61
-rw-r--r--core/src/test/scala/spark/SortingSuite.scala29
-rw-r--r--core/src/test/scala/spark/ThreadingSuite.scala25
11 files changed, 163 insertions, 114 deletions
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 0eda80af64..5ac7f5d381 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -1,6 +1,6 @@
package spark.bagel
-import org.scalatest.{FunSuite, Assertions}
+import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
@@ -13,9 +13,16 @@ import spark._
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
-class BagelSuite extends FunSuite with Assertions {
+class BagelSuite extends FunSuite with Assertions with BeforeAndAfter{
+
+ var sc: SparkContext = _
+
+ after{
+ sc.stop()
+ }
+
test("halting by voting") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 5
@@ -26,11 +33,10 @@ class BagelSuite extends FunSuite with Assertions {
}
for ((id, vert) <- result.collect)
assert(vert.age === numSupersteps)
- sc.stop()
}
test("halting by message silence") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(false, 0))))
val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
val numSupersteps = 5
@@ -48,6 +54,5 @@ class BagelSuite extends FunSuite with Assertions {
}
for ((id, vert) <- result.collect)
assert(vert.age === numSupersteps)
- sc.stop()
}
}
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
index 750703de30..d22c2d4295 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -1,23 +1,31 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
-class BroadcastSuite extends FunSuite {
+class BroadcastSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
test("basic broadcast") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val list = List(1, 2, 3, 4)
val listBroadcast = sc.broadcast(list)
val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
assert(results.collect.toSet === Set((1, 10), (2, 10)))
- sc.stop()
}
test("broadcast variables accessed in multiple threads") {
- val sc = new SparkContext("local[10]", "test")
+ sc = new SparkContext("local[10]", "test")
val list = List(1, 2, 3, 4)
val listBroadcast = sc.broadcast(list)
val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
- sc.stop()
}
}
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index 816411debe..7ef90cf2f6 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -1,6 +1,7 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import org.scalatest.prop.Checkers
import scala.collection.mutable.ArrayBuffer
@@ -22,11 +23,20 @@ object FailureSuiteState {
}
}
-class FailureSuite extends FunSuite {
+class FailureSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
// Run a 3-task map job in which task 1 deterministically fails once, and check
// whether the job completes successfully and we ran 4 tasks in total.
test("failure in a single-stage job") {
- val sc = new SparkContext("local[1,1]", "test")
+ sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3, 3).map { x =>
FailureSuiteState.synchronized {
FailureSuiteState.tasksRun += 1
@@ -41,13 +51,12 @@ class FailureSuite extends FunSuite {
assert(FailureSuiteState.tasksRun === 4)
}
assert(results.toList === List(1,4,9))
- sc.stop()
FailureSuiteState.clear()
}
// Run a map-reduce job in which a reduce task deterministically fails once.
test("failure in a two-stage job") {
- val sc = new SparkContext("local[1,1]", "test")
+ sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
case (k, v) =>
FailureSuiteState.synchronized {
@@ -63,12 +72,11 @@ class FailureSuite extends FunSuite {
assert(FailureSuiteState.tasksRun === 4)
}
assert(results.toSet === Set((1, 1), (2, 4), (3, 9)))
- sc.stop()
FailureSuiteState.clear()
}
test("failure because task results are not serializable") {
- val sc = new SparkContext("local[1,1]", "test")
+ sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
val thrown = intercept[spark.SparkException] {
@@ -77,7 +85,6 @@ class FailureSuite extends FunSuite {
assert(thrown.getClass === classOf[spark.SparkException])
assert(thrown.getMessage.contains("NotSerializableException"))
- sc.stop()
FailureSuiteState.clear()
}
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index b12014e6be..3a77ed0f13 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -6,13 +6,23 @@ import scala.io.Source
import com.google.common.io.Files
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import org.apache.hadoop.io._
import SparkContext._
-class FileSuite extends FunSuite {
+class FileSuite extends FunSuite with BeforeAndAfter{
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
test("text files") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
@@ -23,11 +33,10 @@ class FileSuite extends FunSuite {
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
- sc.stop()
}
test("SequenceFiles") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
@@ -35,11 +44,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
- sc.stop()
}
test("SequenceFile with writable key") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
@@ -47,11 +55,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
- sc.stop()
}
test("SequenceFile with writable value") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
@@ -59,11 +66,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
- sc.stop()
}
test("SequenceFile with writable key and value") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@@ -71,11 +77,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
- sc.stop()
}
test("implicit conversions in reading SequenceFiles") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
@@ -89,11 +94,10 @@ class FileSuite extends FunSuite {
assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
val output3 = sc.sequenceFile[IntWritable, String](outputDir)
assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
- sc.stop()
}
test("object files of ints") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
@@ -101,11 +105,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as an object file
val output = sc.objectFile[Int](outputDir)
assert(output.collect().toList === List(1, 2, 3, 4))
- sc.stop()
}
test("object files of complex types") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
@@ -113,12 +116,11 @@ class FileSuite extends FunSuite {
// Try reading the output back as an object file
val output = sc.objectFile[(Int, String)](outputDir)
assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
- sc.stop()
}
test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@@ -126,12 +128,11 @@ class FileSuite extends FunSuite {
outputDir)
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
- sc.stop()
}
test("read SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@@ -139,6 +140,5 @@ class FileSuite extends FunSuite {
val output =
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
- sc.stop()
}
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 06d446ea24..e889769b9a 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -8,7 +8,8 @@ import com.esotericsoftware.kryo._
import SparkContext._
-class KryoSerializerSuite extends FunSuite {
+class KryoSerializerSuite extends FunSuite{
+
test("basic types") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index 7f7f9493dc..dfe6a295c8 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -1,12 +1,23 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import scala.collection.mutable.ArrayBuffer
import SparkContext._
-class PartitioningSuite extends FunSuite {
+class PartitioningSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
+
test("HashPartitioner equality") {
val p2 = new HashPartitioner(2)
val p4 = new HashPartitioner(4)
@@ -20,7 +31,7 @@ class PartitioningSuite extends FunSuite {
}
test("RangePartitioner equality") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
// Make an RDD where all the elements are the same so that the partition range bounds
// are deterministically all the same.
@@ -46,12 +57,10 @@ class PartitioningSuite extends FunSuite {
assert(p4 != descendingP4)
assert(descendingP2 != p2)
assert(descendingP4 != p4)
-
- sc.stop()
}
test("HashPartitioner not equal to RangePartitioner") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
val rangeP2 = new RangePartitioner(2, rdd)
val hashP2 = new HashPartitioner(2)
@@ -59,11 +68,10 @@ class PartitioningSuite extends FunSuite {
assert(hashP2 === hashP2)
assert(hashP2 != rangeP2)
assert(rangeP2 != hashP2)
- sc.stop()
}
test("partitioner preservation") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
@@ -95,7 +103,5 @@ class PartitioningSuite extends FunSuite {
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
-
- sc.stop()
}
}
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index d5dc2efd91..c0cf034c72 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -1,12 +1,21 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import SparkContext._
-class PipedRDDSuite extends FunSuite {
-
+class PipedRDDSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
test("basic pipe") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"))
@@ -18,18 +27,16 @@ class PipedRDDSuite extends FunSuite {
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
- sc.stop()
}
test("pipe with env variable") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
val c = piped.collect()
assert(c.size === 2)
assert(c(0) === "LALALA")
assert(c(1) === "LALALA")
- sc.stop()
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 7199b634b7..1d240b471f 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -2,11 +2,21 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import SparkContext._
-class RDDSuite extends FunSuite {
+class RDDSuite extends FunSuite with BeforeAndAfter{
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
test("basic operations") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.reduce(_ + _) === 10)
@@ -18,11 +28,10 @@ class RDDSuite extends FunSuite {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))
- sc.stop()
}
test("aggregate") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
val emptyMap = new StringMap {
@@ -40,6 +49,5 @@ class RDDSuite extends FunSuite {
}
val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps)
assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
- sc.stop()
}
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 00b24464a6..c45c5935de 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -1,6 +1,7 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
@@ -12,9 +13,18 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
-class ShuffleSuite extends FunSuite {
+class ShuffleSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
test("groupByKey") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
@@ -22,11 +32,10 @@ class ShuffleSuite extends FunSuite {
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
- sc.stop()
}
test("groupByKey with duplicates") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
@@ -34,11 +43,10 @@ class ShuffleSuite extends FunSuite {
assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
- sc.stop()
}
test("groupByKey with negative key hash codes") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
@@ -46,11 +54,10 @@ class ShuffleSuite extends FunSuite {
assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
- sc.stop()
}
test("groupByKey with many output partitions") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey(10).collect()
assert(groups.size === 2)
@@ -58,37 +65,33 @@ class ShuffleSuite extends FunSuite {
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
- sc.stop()
}
test("reduceByKey") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
- sc.stop()
}
test("reduceByKey with collectAsMap") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collectAsMap()
assert(sums.size === 2)
assert(sums(1) === 7)
assert(sums(2) === 1)
- sc.stop()
}
test("reduceByKey with many output partitons") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
- sc.stop()
}
test("join") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2).collect()
@@ -99,11 +102,10 @@ class ShuffleSuite extends FunSuite {
(2, (1, 'y')),
(2, (1, 'z'))
))
- sc.stop()
}
test("join all-to-all") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
val joined = rdd1.join(rdd2).collect()
@@ -116,11 +118,10 @@ class ShuffleSuite extends FunSuite {
(1, (3, 'x')),
(1, (3, 'y'))
))
- sc.stop()
}
test("leftOuterJoin") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.leftOuterJoin(rdd2).collect()
@@ -132,11 +133,10 @@ class ShuffleSuite extends FunSuite {
(2, (1, Some('z'))),
(3, (1, None))
))
- sc.stop()
}
test("rightOuterJoin") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.rightOuterJoin(rdd2).collect()
@@ -148,20 +148,18 @@ class ShuffleSuite extends FunSuite {
(2, (Some(1), 'z')),
(4, (None, 'w'))
))
- sc.stop()
}
test("join with no matches") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 0)
- sc.stop()
}
test("join with many output partitions") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2, 10).collect()
@@ -172,11 +170,10 @@ class ShuffleSuite extends FunSuite {
(2, (1, 'y')),
(2, (1, 'z'))
))
- sc.stop()
}
test("groupWith") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.groupWith(rdd2).collect()
@@ -187,17 +184,15 @@ class ShuffleSuite extends FunSuite {
(3, (ArrayBuffer(1), ArrayBuffer())),
(4, (ArrayBuffer(), ArrayBuffer('w')))
))
- sc.stop()
}
test("zero-partition RDD") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
val file = sc.textFile(emptyDir.getAbsolutePath)
assert(file.splits.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
- assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
- sc.stop()
- }
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ }
}
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index caff884966..ced3c66d38 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -1,50 +1,55 @@
package spark
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import SparkContext._
-class SortingSuite extends FunSuite {
+class SortingSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
test("sortByKey") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)))
- assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
- sc.stop()
+ assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
}
test("sortLargeArray") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
- sc.stop()
}
test("sortDescending") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr)
assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
- sc.stop()
}
test("morePartitionsThanElements") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 30)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
- sc.stop()
}
test("emptyRDD") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = new Array[(Int, Int)](0)
val pairs = sc.parallelize(pairArr)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
- sc.stop()
}
}
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index d38e72d8b8..d617e8c15f 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
import SparkContext._
@@ -21,9 +22,19 @@ object ThreadingSuiteState {
}
}
-class ThreadingSuite extends FunSuite {
+class ThreadingSuite extends FunSuite with BeforeAndAfter {
+
+ var sc: SparkContext = _
+
+ after{
+ if(sc != null){
+ sc.stop()
+ }
+ }
+
+
test("accessing SparkContext form a different thread") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
val sem = new Semaphore(0)
@volatile var answer1: Int = 0
@@ -38,11 +49,10 @@ class ThreadingSuite extends FunSuite {
sem.acquire()
assert(answer1 === 55)
assert(answer2 === 1)
- sc.stop()
}
test("accessing SparkContext form multiple threads") {
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
val sem = new Semaphore(0)
@volatile var ok = true
@@ -67,11 +77,10 @@ class ThreadingSuite extends FunSuite {
if (!ok) {
fail("One or more threads got the wrong answer from an RDD operation")
}
- sc.stop()
}
test("accessing multi-threaded SparkContext form multiple threads") {
- val sc = new SparkContext("local[4]", "test")
+ sc = new SparkContext("local[4]", "test")
val nums = sc.parallelize(1 to 10, 2)
val sem = new Semaphore(0)
@volatile var ok = true
@@ -96,13 +105,12 @@ class ThreadingSuite extends FunSuite {
if (!ok) {
fail("One or more threads got the wrong answer from an RDD operation")
}
- sc.stop()
}
test("parallel job execution") {
// This test launches two jobs with two threads each on a 4-core local cluster. Each thread
// waits until there are 4 threads running at once, to test that both jobs have been launched.
- val sc = new SparkContext("local[4]", "test")
+ sc = new SparkContext("local[4]", "test")
val nums = sc.parallelize(1 to 2, 2)
val sem = new Semaphore(0)
ThreadingSuiteState.clear()
@@ -132,6 +140,5 @@ class ThreadingSuite extends FunSuite {
if (ThreadingSuiteState.failed.get()) {
fail("One or more threads didn't see runningThreads = 4")
}
- sc.stop()
}
}