aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-12-24 18:30:31 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-12-25 00:09:36 +0530
commit2573add94cf920a88f74d80d8ea94218d812704d (patch)
tree9eb07c85cadbaea90a8e9742687adda924342b42 /core/src/test/scala
parent0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff)
downloadspark-2573add94cf920a88f74d80d8ea94218d812704d.tar.gz
spark-2573add94cf920a88f74d80d8ea94218d812704d.tar.bz2
spark-2573add94cf920a88f74d80d8ea94218d812704d.zip
spark-544, introducing SparkConf and related configuration overhaul.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala95
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala4
10 files changed, 86 insertions, 82 deletions
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 271dc905bc..10b8b441fd 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
-
+ private val conf = new SparkConf
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
@@ -48,14 +48,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
@@ -74,7 +74,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTrackerMaster()
+ val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
@@ -96,16 +96,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
- val masterTracker = new MapOutputTrackerMaster()
+ val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = Left(actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
- val slaveTracker = new MapOutputTracker()
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf)
+ val slaveTracker = new MapOutputTracker(conf)
slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
"akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 7181333adf..4ecdde0001 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -19,17 +19,19 @@ package org.apache.spark.metrics
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.SparkConf
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
-
+ var conf: SparkConf = null
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
System.setProperty("spark.metrics.conf", filePath)
+ conf = new SparkConf
}
test("MetricsSystem with default config") {
- val metricsSystem = MetricsSystem.createMetricsSystem("default")
+ val metricsSystem = MetricsSystem.createMetricsSystem("default", conf)
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
@@ -39,7 +41,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
}
test("MetricsSystem with sources add") {
- val metricsSystem = MetricsSystem.createMetricsSystem("test")
+ val metricsSystem = MetricsSystem.createMetricsSystem("test", conf)
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 706d84a58b..2aa259daf3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,21 +17,14 @@
package org.apache.spark.scheduler
-import scala.collection.mutable.{Map, HashMap}
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.LocalSparkContext
-import org.apache.spark.MapOutputTrackerMaster
-import org.apache.spark.SparkContext
-import org.apache.spark.Partition
-import org.apache.spark.TaskContext
-import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
-import org.apache.spark.{FetchFailed, Success, TaskEndReason}
+import scala.Tuple2
+import scala.collection.mutable.{HashMap, Map}
+
+import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
+import org.scalatest.{BeforeAndAfter, FunSuite}
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
@@ -46,7 +39,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
* and capturing the resulting TaskSets from the mock TaskScheduler.
*/
class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
+ val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
@@ -74,7 +67,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
*/
val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
// stub out BlockManagerMaster.getLocations to use our cacheLocations
- val blockManagerMaster = new BlockManagerMaster(null) {
+ val blockManagerMaster = new BlockManagerMaster(null, conf) {
override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
blockIds.map {
_.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
@@ -99,7 +92,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
taskSets.clear()
cacheLocations.clear()
results.clear()
- mapOutputTracker = new MapOutputTrackerMaster()
+ mapOutputTracker = new MapOutputTrackerMaster(conf)
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 002368ff55..dd122615ad 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -95,7 +95,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
- val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
+ val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER)
joblogger.getLogDir should be ("/tmp/spark-%s".format(user))
joblogger.getJobIDtoPrintWriter.size should be (1)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 95d3553d91..34d2e4cb8c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -169,7 +169,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName("default") != null)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index bb28a31a99..2bb827c022 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -81,8 +81,8 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
-
- val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+ private val conf = new SparkConf
+ val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
index 27c2d53361..618fae7c16 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext, SparkEnv}
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
import org.apache.spark.storage.TaskResultBlockId
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 5b4d63b954..4ef5538951 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,8 +31,10 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.util.{SizeEstimator, Utils, AkkaUtils, ByteBufferInputStream}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.{SparkConf, SparkContext}
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+ private val conf = new SparkConf
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
@@ -42,7 +44,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- System.setProperty("spark.kryoserializer.buffer.mb", "1")
+ conf.set("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
// Implicitly convert strings to BlockIds for test clarity.
@@ -50,22 +52,23 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
before {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf)
this.actorSystem = actorSystem
- System.setProperty("spark.driver.port", boundPort.toString)
- System.setProperty("spark.hostPort", "localhost:" + boundPort)
+ conf.set("spark.driver.port", boundPort.toString)
+ conf.set("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))))
+ Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- oldArch = System.setProperty("os.arch", "amd64")
- oldOops = System.setProperty("spark.test.useCompressedOops", "true")
- oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
+ System.setProperty("os.arch", "amd64")
+ conf.set("os.arch", "amd64")
+ conf.set("spark.test.useCompressedOops", "true")
+ conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
// Set some value ...
- System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111)
+ conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111)
}
after {
@@ -86,13 +89,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master = null
if (oldArch != null) {
- System.setProperty("os.arch", oldArch)
+ conf.set("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}
if (oldOops != null) {
- System.setProperty("spark.test.useCompressedOops", oldOops)
+ conf.set("spark.test.useCompressedOops", oldOops)
} else {
System.clearProperty("spark.test.useCompressedOops")
}
@@ -133,7 +136,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 1 manager interaction") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -163,8 +166,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("master + 2 managers interaction") {
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
- store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000, conf)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -179,7 +182,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -227,7 +230,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("removing rdd") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -261,7 +264,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -277,7 +280,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("reregistration on block update") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@@ -296,7 +299,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("reregistration doesn't dead lock") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf)
val a1 = new Array[Byte](400)
val a2 = List(new Array[Byte](400))
@@ -333,7 +336,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -352,7 +355,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -371,7 +374,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -390,7 +393,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -413,7 +416,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -426,7 +429,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -441,7 +444,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -456,7 +459,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -471,7 +474,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -486,7 +489,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -511,7 +514,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -535,7 +538,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -581,7 +584,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -591,53 +594,53 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
- System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+ conf.set("spark.shuffle.compress", "true")
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
+ conf.set("spark.shuffle.compress", "false")
+ store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf)
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
"shuffle_0_0_0 was compressed")
store.stop()
store = null
- System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
+ conf.set("spark.broadcast.compress", "true")
+ store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf)
store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
"broadcast_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
+ conf.set("spark.broadcast.compress", "false")
+ store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf)
store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
- System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
+ conf.set("spark.rdd.compress", "true")
+ store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf)
store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
- System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
+ conf.set("spark.rdd.compress", "false")
+ store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf)
store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
+ store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()
@@ -651,7 +654,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200)
+ store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200, conf)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 070982e798..f940448abd 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -17,15 +17,18 @@
package org.apache.spark.storage
-import java.io.{FileWriter, File}
+import java.io.{File, FileWriter}
import scala.collection.mutable
import com.google.common.io.Files
+import org.apache.spark.SparkConf
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
+import scala.util.Try
+import akka.actor.{Props, ActorSelection, ActorSystem}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
-
+ private val testConf = new SparkConf
val rootDir0 = Files.createTempDir()
rootDir0.deleteOnExit()
val rootDir1 = Files.createTempDir()
@@ -36,10 +39,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
// This suite focuses primarily on consolidation features,
// so we coerce consolidation if not already enabled.
val consolidateProp = "spark.shuffle.consolidateFiles"
- val oldConsolidate = Option(System.getProperty(consolidateProp))
- System.setProperty(consolidateProp, "true")
+ val oldConsolidate = Try(testConf.get(consolidateProp)).toOption
+ testConf.set(consolidateProp, "true")
val shuffleBlockManager = new ShuffleBlockManager(null) {
+ override def conf = testConf.clone
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 5aff26f9fc..a5facd5bbd 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
+import org.apache.spark.SparkContext
class DummyClass1 {}
@@ -139,7 +140,8 @@ class SizeEstimatorSuite
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
-
+ SparkContext.globalConf.set("os.arch", "amd64")
+ SparkContext.globalConf.set("spark.test.useCompressedOops", "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()