aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala51
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala31
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala57
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala38
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala6
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java8
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java8
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java8
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java8
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java8
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala12
23 files changed, 216 insertions, 232 deletions
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 998f3008ec..97ea3578aa 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark
-import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
@@ -29,16 +28,10 @@ class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
-class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
- with LocalSparkContext {
+class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {
val clusterUrl = "local-cluster[2,1,512]"
- after {
- System.clearProperty("spark.reducer.maxMbInFlight")
- System.clearProperty("spark.storage.memoryFraction")
- }
-
test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
@@ -84,15 +77,14 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("groupByKey where map output sizes exceed maxMbInFlight") {
- System.setProperty("spark.reducer.maxMbInFlight", "1")
- sc = new SparkContext(clusterUrl, "test")
+ val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
+ sc = new SparkContext(clusterUrl, "test", conf)
// This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
// file should be about 2.5 MB
val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
assert(groups.length === 16)
assert(groups.map(_._2).sum === 2000)
- // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
}
test("accumulators") {
@@ -210,7 +202,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("compute without caching when no partitions fit in memory") {
- System.setProperty("spark.storage.memoryFraction", "0.0001")
sc = new SparkContext(clusterUrl, "test")
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
@@ -218,12 +209,11 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
- System.clearProperty("spark.storage.memoryFraction")
}
test("compute when only some partitions fit in memory") {
- System.setProperty("spark.storage.memoryFraction", "0.01")
- sc = new SparkContext(clusterUrl, "test")
+ val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
+ sc = new SparkContext(clusterUrl, "test", conf)
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
// to make sure that *some* of them do fit though
@@ -231,7 +221,6 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
- System.clearProperty("spark.storage.memoryFraction")
}
test("passing environment variables to cluster") {
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 49426545c7..0f49ce4754 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -31,10 +31,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
@transient var tmpJarUrl: String = _
+ def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
+
override def beforeEach() {
super.beforeEach()
resetSparkContext()
- System.setProperty("spark.authenticate", "false")
}
override def beforeAll() {
@@ -52,7 +53,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val jarFile = new File(testTempDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
- System.setProperty("spark.authenticate", "false")
val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
@@ -74,7 +74,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}
test("Distributing files locally") {
- sc = new SparkContext("local[4]", "test")
+ sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
@@ -108,7 +108,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test("Distributing files locally using URL as input") {
// addFile("file:///....")
- sc = new SparkContext("local[4]", "test")
+ sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(new File(tmpFile.toString).toURI.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
@@ -122,7 +122,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}
test ("Dynamically adding JARS locally") {
- sc = new SparkContext("local[4]", "test")
+ sc = new SparkContext("local[4]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
@@ -133,7 +133,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}
test("Distributing files on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,512]", "test")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
@@ -147,7 +147,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}
test ("Dynamically adding JARS on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,512]", "test")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
@@ -158,7 +158,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
- sc = new SparkContext("local-cluster[1,1,512]", "test")
+ sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 41ed2bce55..7584ae79fc 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -40,12 +40,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
override def afterEach() {
super.afterEach()
resetSparkContext()
- System.clearProperty("spark.scheduler.mode")
}
test("local mode, FIFO scheduler") {
- System.setProperty("spark.scheduler.mode", "FIFO")
- sc = new SparkContext("local[2]", "test")
+ val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+ sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
@@ -53,10 +52,10 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("local mode, fair scheduler") {
- System.setProperty("spark.scheduler.mode", "FAIR")
+ val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.scheduler.allocation.file", xmlPath)
- sc = new SparkContext("local[2]", "test")
+ conf.set("spark.scheduler.allocation.file", xmlPath)
+ sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
@@ -64,8 +63,8 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("cluster mode, FIFO scheduler") {
- System.setProperty("spark.scheduler.mode", "FIFO")
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
@@ -73,10 +72,10 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("cluster mode, fair scheduler") {
- System.setProperty("spark.scheduler.mode", "FAIR")
+ val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.scheduler.allocation.file", xmlPath)
- sc = new SparkContext("local-cluster[2,1,512]", "test")
+ conf.set("spark.scheduler.allocation.file", xmlPath)
+ sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 58a96245a9..f57921b768 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -35,19 +35,15 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
conf.set("spark.test.noStageRetry", "true")
test("groupByKey without compression") {
- try {
- System.setProperty("spark.shuffle.compress", "false")
- sc = new SparkContext("local", "test", conf)
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
- val groups = pairs.groupByKey(4).collect()
- assert(groups.size === 2)
- val valuesFor1 = groups.find(_._1 == 1).get._2
- assert(valuesFor1.toList.sorted === List(1, 2, 3))
- val valuesFor2 = groups.find(_._1 == 2).get._2
- assert(valuesFor2.toList.sorted === List(1))
- } finally {
- System.setProperty("spark.shuffle.compress", "true")
- }
+ val myConf = conf.clone().set("spark.shuffle.compress", "false")
+ sc = new SparkContext("local", "test", myConf)
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
+ val groups = pairs.groupByKey(4).collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
}
test("shuffle non-zero block size") {
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 5d018ea986..790976a5ac 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,27 +19,20 @@ package org.apache.spark
import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
+import org.apache.spark.util.ResetSystemProperties
import com.esotericsoftware.kryo.Kryo
-class SparkConfSuite extends FunSuite with LocalSparkContext {
+class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
test("loading from system properties") {
- try {
- System.setProperty("spark.test.testProperty", "2")
- val conf = new SparkConf()
- assert(conf.get("spark.test.testProperty") === "2")
- } finally {
- System.clearProperty("spark.test.testProperty")
- }
+ System.setProperty("spark.test.testProperty", "2")
+ val conf = new SparkConf()
+ assert(conf.get("spark.test.testProperty") === "2")
}
test("initializing without loading defaults") {
- try {
- System.setProperty("spark.test.testProperty", "2")
- val conf = new SparkConf(false)
- assert(!conf.contains("spark.test.testProperty"))
- } finally {
- System.clearProperty("spark.test.testProperty")
- }
+ System.setProperty("spark.test.testProperty", "2")
+ val conf = new SparkConf(false)
+ assert(!conf.contains("spark.test.testProperty"))
}
test("named set methods") {
@@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
test("nested property names") {
// This wasn't supported by some external conf parsing libraries
- try {
- System.setProperty("spark.test.a", "a")
- System.setProperty("spark.test.a.b", "a.b")
- System.setProperty("spark.test.a.b.c", "a.b.c")
- val conf = new SparkConf()
- assert(conf.get("spark.test.a") === "a")
- assert(conf.get("spark.test.a.b") === "a.b")
- assert(conf.get("spark.test.a.b.c") === "a.b.c")
- conf.set("spark.test.a.b", "A.B")
- assert(conf.get("spark.test.a") === "a")
- assert(conf.get("spark.test.a.b") === "A.B")
- assert(conf.get("spark.test.a.b.c") === "a.b.c")
- } finally {
- System.clearProperty("spark.test.a")
- System.clearProperty("spark.test.a.b")
- System.clearProperty("spark.test.a.b.c")
- }
+ System.setProperty("spark.test.a", "a")
+ System.setProperty("spark.test.a.b", "a.b")
+ System.setProperty("spark.test.a.b.c", "a.b.c")
+ val conf = new SparkConf()
+ assert(conf.get("spark.test.a") === "a")
+ assert(conf.get("spark.test.a.b") === "a.b")
+ assert(conf.get("spark.test.a.b.c") === "a.b.c")
+ conf.set("spark.test.a.b", "A.B")
+ assert(conf.get("spark.test.a") === "a")
+ assert(conf.get("spark.test.a.b") === "A.B")
+ assert(conf.get("spark.test.a.b.c") === "a.b.c")
}
test("register kryo classes through registerKryoClasses") {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 0390a2e4f1..8ae4f243ec 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
- def createTaskScheduler(master: String): TaskSchedulerImpl = {
+ def createTaskScheduler(master: String): TaskSchedulerImpl =
+ createTaskScheduler(master, new SparkConf())
+
+ def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test", conf)
val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
@@ -102,19 +105,13 @@ class SparkContextSchedulerCreationSuite
}
test("local-default-parallelism") {
- val defaultParallelism = System.getProperty("spark.default.parallelism")
- System.setProperty("spark.default.parallelism", "16")
- val sched = createTaskScheduler("local")
+ val conf = new SparkConf().set("spark.default.parallelism", "16")
+ val sched = createTaskScheduler("local", conf)
sched.backend match {
case s: LocalBackend => assert(s.defaultParallelism() === 16)
case _ => fail()
}
-
- Option(defaultParallelism) match {
- case Some(v) => System.setProperty("spark.default.parallelism", v)
- case _ => System.clearProperty("spark.default.parallelism")
- }
}
test("simr") {
@@ -155,9 +152,10 @@ class SparkContextSchedulerCreationSuite
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
}
- def testMesos(master: String, expectedClass: Class[_]) {
+ def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
+ val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
try {
- val sched = createTaskScheduler(master)
+ val sched = createTaskScheduler(master, conf)
assert(sched.backend.getClass === expectedClass)
} catch {
case e: UnsatisfiedLinkError =>
@@ -168,17 +166,14 @@ class SparkContextSchedulerCreationSuite
}
test("mesos fine-grained") {
- System.setProperty("spark.mesos.coarse", "false")
- testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
+ testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse = false)
}
test("mesos coarse-grained") {
- System.setProperty("spark.mesos.coarse", "true")
- testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
+ testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], coarse = true)
}
test("mesos with zookeeper") {
- System.setProperty("spark.mesos.coarse", "false")
- testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend])
+ testMesos("zk://localhost:1234,localhost:2345", classOf[MesosSchedulerBackend], coarse = false)
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 1362022104..8b3c6871a7 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -23,55 +23,37 @@ import org.apache.hadoop.io.BytesWritable
class SparkContextSuite extends FunSuite with LocalSparkContext {
- /** Allows system properties to be changed in tests */
- private def withSystemProperty[T](property: String, value: String)(block: => T): T = {
- val originalValue = System.getProperty(property)
- try {
- System.setProperty(property, value)
- block
- } finally {
- if (originalValue == null) {
- System.clearProperty(property)
- } else {
- System.setProperty(property, originalValue)
- }
- }
- }
-
test("Only one SparkContext may be active at a time") {
// Regression test for SPARK-4180
- withSystemProperty("spark.driver.allowMultipleContexts", "false") {
- val conf = new SparkConf().setAppName("test").setMaster("local")
- sc = new SparkContext(conf)
- // A SparkContext is already running, so we shouldn't be able to create a second one
- intercept[SparkException] { new SparkContext(conf) }
- // After stopping the running context, we should be able to create a new one
- resetSparkContext()
- sc = new SparkContext(conf)
- }
+ val conf = new SparkConf().setAppName("test").setMaster("local")
+ .set("spark.driver.allowMultipleContexts", "false")
+ sc = new SparkContext(conf)
+ // A SparkContext is already running, so we shouldn't be able to create a second one
+ intercept[SparkException] { new SparkContext(conf) }
+ // After stopping the running context, we should be able to create a new one
+ resetSparkContext()
+ sc = new SparkContext(conf)
}
test("Can still construct a new SparkContext after failing to construct a previous one") {
- withSystemProperty("spark.driver.allowMultipleContexts", "false") {
- // This is an invalid configuration (no app name or master URL)
- intercept[SparkException] {
- new SparkContext(new SparkConf())
- }
- // Even though those earlier calls failed, we should still be able to create a new context
- sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
+ val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false")
+ // This is an invalid configuration (no app name or master URL)
+ intercept[SparkException] {
+ new SparkContext(conf)
}
+ // Even though those earlier calls failed, we should still be able to create a new context
+ sc = new SparkContext(conf.setMaster("local").setAppName("test"))
}
test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
- withSystemProperty("spark.driver.allowMultipleContexts", "true") {
- var secondSparkContext: SparkContext = null
- try {
- val conf = new SparkConf().setAppName("test").setMaster("local")
- sc = new SparkContext(conf)
- secondSparkContext = new SparkContext(conf)
- } finally {
- Option(secondSparkContext).foreach(_.stop())
- }
+ var secondSparkContext: SparkContext = null
+ try {
+ val conf = new SparkConf().setAppName("test").setMaster("local")
+ .set("spark.driver.allowMultipleContexts", "true")
+ sc = new SparkContext(conf)
+ secondSparkContext = new SparkContext(conf)
+ } finally {
+ Option(secondSparkContext).foreach(_.stop())
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index eb7bd7ab39..5eda2d41f0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -23,11 +23,13 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.deploy.SparkSubmit._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ResetSystemProperties, Utils}
import org.scalatest.FunSuite
import org.scalatest.Matchers
-class SparkSubmitSuite extends FunSuite with Matchers {
+// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
+// of properties that neeed to be cleared after tests.
+class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
def beforeAll() {
System.setProperty("spark.testing", "true")
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index b276343cb4..24f41bf8cc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,9 +26,10 @@ import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkContext}
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.util.ResetSystemProperties
-class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
- with BeforeAndAfter with BeforeAndAfterAll {
+class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers with BeforeAndAfter
+ with BeforeAndAfterAll with ResetSystemProperties {
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
@@ -37,10 +38,6 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
sc = new SparkContext("local", "SparkListenerSuite")
}
- override def afterAll() {
- System.clearProperty("spark.akka.frameSize")
- }
-
test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 5768a3a733..3aab5a156e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.storage.TaskResultBlockId
/**
@@ -55,27 +55,20 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
/**
* Tests related to handling task results (both direct and indirect).
*/
-class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
- with LocalSparkContext {
+class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
- override def beforeAll {
- // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
- // as we can make it) so the tests don't take too long.
- System.setProperty("spark.akka.frameSize", "1")
- }
-
- override def afterAll {
- System.clearProperty("spark.akka.frameSize")
- }
+ // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+ // as we can make it) so the tests don't take too long.
+ def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
test("handling results smaller than Akka frame size") {
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test", conf)
val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
assert(result === 2)
}
test("handling results larger than Akka frame size") {
- sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test", conf)
val akkaFrameSize =
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
@@ -89,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
test("task retried if result missing from block manager") {
// Set the maximum number of task failures to > 0, so that the task set isn't aborted
// after the result is missing.
- sc = new SparkContext("local[1,2]", "test")
+ sc = new SparkContext("local[1,2]", "test", conf)
// If this test hangs, it's probably because no resource offers were made after the task
// failed.
val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 7532da88c6..40aaf9dd1f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -162,12 +162,12 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
}
test("Fair Scheduler Test") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+ val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
+ sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val taskScheduler = new TaskSchedulerImpl(sc)
val taskSet = FakeTask.createTaskSet(1)
- val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- System.setProperty("spark.scheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 5554efbcba..ffe6f03914 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -33,7 +33,7 @@ import akka.util.Timeout
import org.mockito.Mockito.{mock, when}
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
+import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
@@ -44,18 +44,17 @@ import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
-import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
+import org.apache.spark.util._
-class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
- with PrivateMethodTester {
+class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
+ with PrivateMethodTester with ResetSystemProperties {
private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
- var oldArch: String = null
conf.set("spark.authenticate", "false")
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -79,13 +78,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
manager
}
- before {
+ override def beforeEach(): Unit = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"test", "localhost", 0, conf = conf, securityManager = securityMgr)
this.actorSystem = actorSystem
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- oldArch = System.setProperty("os.arch", "amd64")
+ System.setProperty("os.arch", "amd64")
conf.set("os.arch", "amd64")
conf.set("spark.test.useCompressedOops", "true")
conf.set("spark.driver.port", boundPort.toString)
@@ -100,7 +99,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
SizeEstimator invokePrivate initialize()
}
- after {
+ override def afterEach(): Unit = {
if (store != null) {
store.stop()
store = null
@@ -113,14 +112,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
actorSystem.awaitTermination()
actorSystem = null
master = null
-
- if (oldArch != null) {
- conf.set("os.arch", oldArch)
- } else {
- System.clearProperty("os.arch")
- }
-
- System.clearProperty("spark.test.useCompressedOops")
}
test("StorageLevel object caching") {
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 7bca1711ae..6bbf72e929 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.BlockManagerId
/**
* Test the AkkaUtils with various security settings.
*/
-class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
+class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
test("remote fetch security bad password") {
val conf = new SparkConf
diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
new file mode 100644
index 0000000000..d4b92f33dd
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Properties
+
+import org.scalatest.{BeforeAndAfterEach, Suite}
+
+/**
+ * Mixin for automatically resetting system properties that are modified in ScalaTest tests.
+ * This resets the properties after each individual test.
+ *
+ * The order in which fixtures are mixed in affects the order in which they are invoked by tests.
+ * If we have a suite `MySuite extends FunSuite with Foo with Bar`, then
+ * Bar's `super` is Foo, so Bar's beforeEach() will and afterEach() methods will be invoked first
+ * by the rest runner.
+ *
+ * This means that ResetSystemProperties should appear as the last trait in test suites that it's
+ * mixed into in order to ensure that the system properties snapshot occurs as early as possible.
+ * ResetSystemProperties calls super.afterEach() before performing its own cleanup, ensuring that
+ * the old properties are restored as late as possible.
+ *
+ * See the "Composing fixtures by stacking traits" section at
+ * http://www.scalatest.org/user_guide/sharing_fixtures for more details about this pattern.
+ */
+private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Suite =>
+ var oldProperties: Properties = null
+
+ override def beforeEach(): Unit = {
+ oldProperties = new Properties(System.getProperties)
+ super.beforeEach()
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ super.afterEach()
+ } finally {
+ System.setProperties(oldProperties)
+ oldProperties = null
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 0ea2d13a83..7424c2e91d 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -17,9 +17,7 @@
package org.apache.spark.util
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, PrivateMethodTester}
class DummyClass1 {}
@@ -46,20 +44,12 @@ class DummyString(val arr: Array[Char]) {
}
class SizeEstimatorSuite
- extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
+ extends FunSuite with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties {
- var oldArch: String = _
- var oldOops: String = _
-
- override def beforeAll() {
+ override def beforeEach() {
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- oldArch = System.setProperty("os.arch", "amd64")
- oldOops = System.setProperty("spark.test.useCompressedOops", "true")
- }
-
- override def afterAll() {
- resetOrClear("os.arch", oldArch)
- resetOrClear("spark.test.useCompressedOops", oldOops)
+ System.setProperty("os.arch", "amd64")
+ System.setProperty("spark.test.useCompressedOops", "true")
}
test("simple classes") {
@@ -122,7 +112,7 @@ class SizeEstimatorSuite
}
test("32-bit arch") {
- val arch = System.setProperty("os.arch", "x86")
+ System.setProperty("os.arch", "x86")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
@@ -131,14 +121,13 @@ class SizeEstimatorSuite
assertResult(48)(SizeEstimator.estimate(DummyString("a")))
assertResult(48)(SizeEstimator.estimate(DummyString("ab")))
assertResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
- resetOrClear("os.arch", arch)
}
// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("64-bit arch with no compressed oops") {
- val arch = System.setProperty("os.arch", "amd64")
- val oops = System.setProperty("spark.test.useCompressedOops", "false")
+ System.setProperty("os.arch", "amd64")
+ System.setProperty("spark.test.useCompressedOops", "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
@@ -146,16 +135,5 @@ class SizeEstimatorSuite
assertResult(64)(SizeEstimator.estimate(DummyString("a")))
assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
-
- resetOrClear("os.arch", arch)
- resetOrClear("spark.test.useCompressedOops", oops)
- }
-
- def resetOrClear(prop: String, oldValue: String) {
- if (oldValue != null) {
- System.setProperty(prop, oldValue)
- } else {
- System.clearProperty(prop)
- }
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f9d4bea823..4544382094 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkConf
-class UtilsSuite extends FunSuite {
+class UtilsSuite extends FunSuite with ResetSystemProperties {
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index adecd93435..1b53f3edbe 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -28,11 +28,9 @@ object BroadcastTest {
val bcName = if (args.length > 2) args(2) else "Http"
val blockSize = if (args.length > 3) args(3) else "4096"
- System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName +
- "BroadcastFactory")
- System.setProperty("spark.broadcast.blockSize", blockSize)
val sparkConf = new SparkConf().setAppName("Broadcast Test")
-
+ .set("spark.broadcast.factory", s"org.apache.spark.broadcast.${bcName}BroaddcastFactory")
+ .set("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(sparkConf)
val slices = if (args.length > 0) args(0).toInt else 2
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019000..1e24da7f5f 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019000..1e24da7f5f 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019000..1e24da7f5f 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019000..1e24da7f5f 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019000..1e24da7f5f 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.After;
import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
- System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index db58eb642b..15ee95070a 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.util.Utils
@@ -49,13 +49,13 @@ object StoragePerfTester {
val writeData = "1" * recordLength
val executor = Executors.newFixedThreadPool(numMaps)
- System.setProperty("spark.shuffle.compress", "false")
- System.setProperty("spark.shuffle.sync", "true")
- System.setProperty("spark.shuffle.manager",
- "org.apache.spark.shuffle.hash.HashShuffleManager")
+ val conf = new SparkConf()
+ .set("spark.shuffle.compress", "false")
+ .set("spark.shuffle.sync", "true")
+ .set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
// This is only used to instantiate a BlockManager. All thread scheduling is done manually.
- val sc = new SparkContext("local[4]", "Write Tester")
+ val sc = new SparkContext("local[4]", "Write Tester", conf)
val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
def writeOutputBytes(mapId: Int, total: AtomicLong) = {