aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2012-12-05 23:00:59 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2012-12-05 23:36:05 -0800
commita2a94fdbc755ccf1bea4600a273f214a624b3a98 (patch)
tree18b87c14bed63cda66b004127edc11096ad5e0a4 /core
parentd21ca010ac14890065e559bab80f56830bb533a7 (diff)
downloadspark-a2a94fdbc755ccf1bea4600a273f214a624b3a98.tar.gz
spark-a2a94fdbc755ccf1bea4600a273f214a624b3a98.tar.bz2
spark-a2a94fdbc755ccf1bea4600a273f214a624b3a98.zip
Tests for block manager heartbeats.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala68
1 files changed, 68 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index b9c19e61cd..1491818140 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -14,10 +14,12 @@ import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
+ var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
var oldOops: String = null
+ var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer
@@ -29,6 +31,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
+ oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
@@ -36,6 +39,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
after {
if (store != null) {
store.stop()
+ store = null
+ }
+ if (store2 != null) {
+ store2.stop()
+ store2 = null
}
actorSystem.shutdown()
actorSystem.awaitTermination()
@@ -85,6 +93,66 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
}
+ test("reregistration on heart beat") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager(master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+ store invokePrivate heartBeat()
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0,
+ "a1 was not reregistered with master")
+ }
+
+ test("reregistration on block update") {
+ store = new BlockManager(master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+ store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0,
+ "a1 was not reregistered with master")
+ assert(master.mustGetLocations(GetLocations("a2")).size > 0,
+ "master was not told about a2")
+ }
+
+ test("deregistration on duplicate") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager(master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ store2 = new BlockManager(master, serializer, 2000)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+ store invokePrivate heartBeat()
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ store2 invokePrivate heartBeat()
+
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
+ }
+
test("in-memory LRU storage") {
store = new BlockManager(master, serializer, 1200)
val a1 = new Array[Byte](400)