aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-01-14 17:37:27 -0800
committerAndrew Or <andrew@databricks.com>2016-01-14 17:37:27 -0800
commit25782981cf58946dc7c186acadd2beec5d964461 (patch)
treeae34e5572284dd00b709d9ab1ae5f1d1446a1f34 /core
parentbcc7373f673d1a51b48fb95432ba5c4644dd5d23 (diff)
downloadspark-25782981cf58946dc7c186acadd2beec5d964461.tar.gz
spark-25782981cf58946dc7c186acadd2beec5d964461.tar.bz2
spark-25782981cf58946dc7c186acadd2beec5d964461.zip
[SPARK-12174] Speed up BlockManagerSuite getRemoteBytes() test
This patch significantly speeds up the BlockManagerSuite's "SPARK-9591: getRemoteBytes from another location when Exception throw" test, reducing the test time from 45s to ~250ms. The key change was to set `spark.shuffle.io.maxRetries` to 0 (the code previously set `spark.network.timeout` to `2s`, but this didn't make a difference because the slowdown was not due to this timeout). Along the way, I also cleaned up the way that we handle SparkConf in BlockManagerSuite: previously, each test would mutate a shared SparkConf instance, while now each test gets a fresh SparkConf. Author: Josh Rosen <joshrosen@databricks.com> Closes #10759 from JoshRosen/SPARK-12174.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala71
1 files changed, 30 insertions, 41 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 67210e5d4c..62e6c4f793 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,20 +45,18 @@ import org.apache.spark.util._
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with ResetSystemProperties {
- private val conf = new SparkConf(false).set("spark.app.id", "test")
+ var conf: SparkConf = null
var store: BlockManager = null
var store2: BlockManager = null
var store3: BlockManager = null
var rpcEnv: RpcEnv = null
var master: BlockManagerMaster = null
- conf.set("spark.authenticate", "false")
- val securityMgr = new SecurityManager(conf)
- val mapOutputTracker = new MapOutputTrackerMaster(conf)
- val shuffleManager = new HashShuffleManager(conf)
+ val securityMgr = new SecurityManager(new SparkConf(false))
+ val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false))
+ val shuffleManager = new HashShuffleManager(new SparkConf(false))
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- conf.set("spark.kryoserializer.buffer", "1m")
- val serializer = new KryoSerializer(conf)
+ val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -79,15 +77,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
override def beforeEach(): Unit = {
super.beforeEach()
- rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
-
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
System.setProperty("os.arch", "amd64")
- conf.set("os.arch", "amd64")
- conf.set("spark.test.useCompressedOops", "true")
+ conf = new SparkConf(false)
+ .set("spark.app.id", "test")
+ .set("spark.kryoserializer.buffer", "1m")
+ .set("spark.test.useCompressedOops", "true")
+ .set("spark.storage.unrollFraction", "0.4")
+ .set("spark.storage.unrollMemoryThreshold", "512")
+
+ rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
- conf.set("spark.storage.unrollFraction", "0.4")
- conf.set("spark.storage.unrollMemoryThreshold", "512")
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
@@ -98,6 +98,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
override def afterEach(): Unit = {
try {
+ conf = null
if (store != null) {
store.stop()
store = null
@@ -473,34 +474,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
- val origTimeoutOpt = conf.getOption("spark.network.timeout")
- try {
- conf.set("spark.network.timeout", "2s")
- store = makeBlockManager(8000, "executor1")
- store2 = makeBlockManager(8000, "executor2")
- store3 = makeBlockManager(8000, "executor3")
- val list1 = List(new Array[Byte](4000))
- store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- var list1Get = store.getRemoteBytes("list1")
- assert(list1Get.isDefined, "list1Get expected to be fetched")
- // block manager exit
- store2.stop()
- store2 = null
- list1Get = store.getRemoteBytes("list1")
- // get `list1` block
- assert(list1Get.isDefined, "list1Get expected to be fetched")
- store3.stop()
- store3 = null
- // exception throw because there is no locations
- intercept[BlockFetchException] {
- list1Get = store.getRemoteBytes("list1")
- }
- } finally {
- origTimeoutOpt match {
- case Some(t) => conf.set("spark.network.timeout", t)
- case None => conf.remove("spark.network.timeout")
- }
+ conf.set("spark.shuffle.io.maxRetries", "0")
+ store = makeBlockManager(8000, "executor1")
+ store2 = makeBlockManager(8000, "executor2")
+ store3 = makeBlockManager(8000, "executor3")
+ val list1 = List(new Array[Byte](4000))
+ store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
+ store2.stop()
+ store2 = null
+ assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
+ store3.stop()
+ store3 = null
+ // exception throw because there is no locations
+ intercept[BlockFetchException] {
+ store.getRemoteBytes("list1")
}
}