aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala32
-rw-r--r--core/src/test/scala/spark/BroadcastSuite.scala14
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala19
-rw-r--r--core/src/test/scala/spark/ClosureCleanerSuite.scala73
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala23
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala14
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala16
-rw-r--r--core/src/test/scala/spark/FileSuite.scala16
-rw-r--r--core/src/test/scala/spark/LocalSparkContext.scala41
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala7
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala15
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala16
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala15
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala14
-rw-r--r--core/src/test/scala/spark/SortingSuite.scala13
-rw-r--r--core/src/test/scala/spark/ThreadingSuite.scala14
-rw-r--r--core/src/test/scala/spark/scheduler/TaskContextSuite.scala14
17 files changed, 110 insertions, 246 deletions
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index d8be99dde7..78d64a44ae 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -1,6 +1,5 @@
package spark
-import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import collection.mutable
@@ -9,18 +8,7 @@ import scala.math.exp
import scala.math.signum
import spark.SparkContext._
-class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-
- var sc: SparkContext = null
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
test ("basic accumulation"){
sc = new SparkContext("local", "test")
@@ -53,10 +41,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
for (i <- 1 to maxI) {
v should contain(i)
}
- sc.stop()
- sc = null
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ resetSparkContext()
}
}
@@ -86,10 +71,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
x => acc.value += x
}
} should produce [SparkException]
- sc.stop()
- sc = null
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ resetSparkContext()
}
}
@@ -115,10 +97,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
bufferAcc.value should contain(i)
mapAcc.value should contain (i -> i.toString)
}
- sc.stop()
- sc = null
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ resetSparkContext()
}
}
@@ -134,8 +113,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
x => acc.localValue ++= x
}
acc.value should be ( (0 to maxI).toSet)
- sc.stop()
- sc = null
+ resetSparkContext()
}
}
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
index 2d3302f0aa..362a31fb0d 100644
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/spark/BroadcastSuite.scala
@@ -1,20 +1,8 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-class BroadcastSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class BroadcastSuite extends FunSuite with LocalSparkContext {
test("basic broadcast") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 51573254ca..33c317720c 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -1,34 +1,27 @@
package spark
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.FunSuite
import java.io.File
import spark.rdd._
import spark.SparkContext._
import storage.StorageLevel
-class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
+class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
initLogging()
- var sc: SparkContext = _
var checkpointDir: File = _
val partitioner = new HashPartitioner(2)
- before {
+ override def beforeEach() {
+ super.beforeEach()
checkpointDir = File.createTempFile("temp", "")
checkpointDir.delete()
-
sc = new SparkContext("local", "test")
sc.setCheckpointDir(checkpointDir.toString)
}
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
-
+ override def afterEach() {
+ super.afterEach()
if (checkpointDir != null) {
checkpointDir.delete()
}
diff --git a/core/src/test/scala/spark/ClosureCleanerSuite.scala b/core/src/test/scala/spark/ClosureCleanerSuite.scala
index dfa2de80e6..b2d0dd4627 100644
--- a/core/src/test/scala/spark/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/spark/ClosureCleanerSuite.scala
@@ -3,6 +3,7 @@ package spark
import java.io.NotSerializableException
import org.scalatest.FunSuite
+import spark.LocalSparkContext._
import SparkContext._
class ClosureCleanerSuite extends FunSuite {
@@ -43,13 +44,10 @@ object TestObject {
def run(): Int = {
var nonSer = new NonSerializable
var x = 5
- val sc = new SparkContext("local", "test")
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- val answer = nums.map(_ + x).reduce(_ + _)
- sc.stop()
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- return answer
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + x).reduce(_ + _)
+ }
}
}
@@ -60,11 +58,10 @@ class TestClass extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
- val sc = new SparkContext("local", "test")
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- val answer = nums.map(_ + getX).reduce(_ + _)
- sc.stop()
- return answer
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + getX).reduce(_ + _)
+ }
}
}
@@ -73,11 +70,10 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
- val sc = new SparkContext("local", "test")
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- val answer = nums.map(_ + getX).reduce(_ + _)
- sc.stop()
- return answer
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + getX).reduce(_ + _)
+ }
}
}
@@ -89,11 +85,10 @@ class TestClassWithoutFieldAccess {
def run(): Int = {
var nonSer2 = new NonSerializable
var x = 5
- val sc = new SparkContext("local", "test")
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- val answer = nums.map(_ + x).reduce(_ + _)
- sc.stop()
- return answer
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + x).reduce(_ + _)
+ }
}
}
@@ -102,16 +97,16 @@ object TestObjectWithNesting {
def run(): Int = {
var nonSer = new NonSerializable
var answer = 0
- val sc = new SparkContext("local", "test")
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- var y = 1
- for (i <- 1 to 4) {
- var nonSer2 = new NonSerializable
- var x = i
- answer += nums.map(_ + x + y).reduce(_ + _)
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ var y = 1
+ for (i <- 1 to 4) {
+ var nonSer2 = new NonSerializable
+ var x = i
+ answer += nums.map(_ + x + y).reduce(_ + _)
+ }
+ answer
}
- sc.stop()
- return answer
}
}
@@ -121,14 +116,14 @@ class TestClassWithNesting(val y: Int) extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
var answer = 0
- val sc = new SparkContext("local", "test")
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- for (i <- 1 to 4) {
- var nonSer2 = new NonSerializable
- var x = i
- answer += nums.map(_ + x + getY).reduce(_ + _)
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ for (i <- 1 to 4) {
+ var nonSer2 = new NonSerializable
+ var x = i
+ answer += nums.map(_ + x + getY).reduce(_ + _)
+ }
+ answer
}
- sc.stop()
- return answer
}
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0487e06d12..0e2585daa4 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -15,41 +15,28 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.StorageLevel
-class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
+class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
val clusterUrl = "local-cluster[2,1,512]"
- @transient var sc: SparkContext = _
-
after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
System.clearProperty("spark.reducer.maxMbInFlight")
System.clearProperty("spark.storage.memoryFraction")
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
}
test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
- sc.stop()
- System.clearProperty("spark.master.port")
+ resetSparkContext()
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
- sc.stop()
- System.clearProperty("spark.master.port")
+ resetSparkContext()
sc = new SparkContext("local-cluster[2, 1, 512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
- sc.stop()
- System.clearProperty("spark.master.port")
+ resetSparkContext()
sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
- sc.stop()
- System.clearProperty("spark.master.port")
- sc = null
+ resetSparkContext()
}
test("simple groupByKey") {
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index a3454f25f6..8c1445a465 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -1,7 +1,6 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
import org.scalatest.prop.Checkers
import scala.collection.mutable.ArrayBuffer
@@ -23,18 +22,7 @@ object FailureSuiteState {
}
}
-class FailureSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class FailureSuite extends FunSuite with LocalSparkContext {
// Run a 3-task map job in which task 1 deterministically fails once, and check
// whether the job completes successfully and we ran 4 tasks in total.
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index b9e1248829..f1a35bced3 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -2,17 +2,16 @@ package spark
import com.google.common.io.Files
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
import java.io.{File, PrintWriter, FileReader, BufferedReader}
import SparkContext._
-class FileServerSuite extends FunSuite with BeforeAndAfter {
+class FileServerSuite extends FunSuite with LocalSparkContext {
- @transient var sc: SparkContext = _
@transient var tmpFile: File = _
@transient var testJarFile: File = _
- before {
+ override def beforeEach() {
+ super.beforeEach()
// Create a sample text file
val tmpdir = new File(Files.createTempDir(), "test")
tmpdir.mkdir()
@@ -22,17 +21,12 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
pw.close()
}
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
+ override def afterEach() {
+ super.afterEach()
// Clean up downloaded file
if (tmpFile.exists) {
tmpFile.delete()
}
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
}
test("Distributing files locally") {
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 554bea53a9..91b48c7456 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -6,24 +6,12 @@ import scala.io.Source
import com.google.common.io.Files
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
import org.apache.hadoop.io._
import SparkContext._
-class FileSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
-
+class FileSuite extends FunSuite with LocalSparkContext {
+
test("text files") {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
diff --git a/core/src/test/scala/spark/LocalSparkContext.scala b/core/src/test/scala/spark/LocalSparkContext.scala
new file mode 100644
index 0000000000..b5e31ddae3
--- /dev/null
+++ b/core/src/test/scala/spark/LocalSparkContext.scala
@@ -0,0 +1,41 @@
+package spark
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterEach
+
+/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
+trait LocalSparkContext extends BeforeAndAfterEach { self: Suite =>
+
+ @transient var sc: SparkContext = _
+
+ override def afterEach() {
+ resetSparkContext()
+ super.afterEach()
+ }
+
+ def resetSparkContext() = {
+ if (sc != null) {
+ LocalSparkContext.stop(sc)
+ sc = null
+ }
+ }
+
+}
+
+object LocalSparkContext {
+ def stop(sc: SparkContext) {
+ sc.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+
+ /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
+ def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
+ try {
+ f(sc)
+ } finally {
+ stop(sc)
+ }
+ }
+
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 095f415978..7d5305f1e0 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -1,17 +1,13 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
import akka.actor._
import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
import spark.util.AkkaUtils
-class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
- after {
- System.clearProperty("spark.master.port")
- }
+class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
@@ -81,7 +77,6 @@ class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
}
test("remote fetch") {
- System.clearProperty("spark.master.host")
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
System.setProperty("spark.master.port", boundPort.toString)
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index eb3c8f238f..af1107cd19 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -1,25 +1,12 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
import scala.collection.mutable.ArrayBuffer
import SparkContext._
-class PartitioningSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if(sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
-
+class PartitioningSuite extends FunSuite with LocalSparkContext {
test("HashPartitioner equality") {
val p2 = new HashPartitioner(2)
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index 9b84b29227..a6344edf8f 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -1,21 +1,9 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
import SparkContext._
-class PipedRDDSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class PipedRDDSuite extends FunSuite with LocalSparkContext {
test("basic pipe") {
sc = new SparkContext("local", "test")
@@ -51,5 +39,3 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
}
}
-
-
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 73846131a9..ed03e65153 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -1,22 +1,11 @@
package spark
import scala.collection.mutable.HashMap
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.FunSuite
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, PartitionPruningRDD}
-class RDDSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class RDDSuite extends FunSuite with LocalSparkContext {
test("basic operations") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index bebb8ebe86..3493b9511f 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -3,7 +3,6 @@ package spark
import scala.collection.mutable.ArrayBuffer
import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
@@ -15,18 +14,7 @@ import com.google.common.io.Files
import spark.rdd.ShuffledRDD
import spark.SparkContext._
-class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
test("groupByKey") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index 1ad11ff4c3..edb8c839fc 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -5,18 +5,7 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
import SparkContext._
-class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with Logging {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers with Logging {
test("sortByKey") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
index e9b1837d89..ff315b6693 100644
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/spark/ThreadingSuite.scala
@@ -22,19 +22,7 @@ object ThreadingSuiteState {
}
}
-class ThreadingSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if(sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
-
+class ThreadingSuite extends FunSuite with LocalSparkContext {
test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
index ba6f8b588f..a5db7103f5 100644
--- a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
@@ -6,19 +6,9 @@ import spark.TaskContext
import spark.RDD
import spark.SparkContext
import spark.Split
+import spark.LocalSparkContext
-class TaskContextSuite extends FunSuite with BeforeAndAfter {
-
- var sc: SparkContext = _
-
- after {
- if (sc != null) {
- sc.stop()
- sc = null
- }
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
+class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
test("Calls executeOnCompleteCallbacks after failure") {
var completed = false