diff options
author | Charles Reiss <charles@eecs.berkeley.edu> | 2012-12-05 23:00:59 -0800 |
---|---|---|
committer | Charles Reiss <charles@eecs.berkeley.edu> | 2012-12-05 23:36:05 -0800 |
commit | a2a94fdbc755ccf1bea4600a273f214a624b3a98 (patch) | |
tree | 18b87c14bed63cda66b004127edc11096ad5e0a4 | |
parent | d21ca010ac14890065e559bab80f56830bb533a7 (diff) | |
download | spark-a2a94fdbc755ccf1bea4600a273f214a624b3a98.tar.gz spark-a2a94fdbc755ccf1bea4600a273f214a624b3a98.tar.bz2 spark-a2a94fdbc755ccf1bea4600a273f214a624b3a98.zip |
Tests for block manager heartbeats.
-rw-r--r-- | core/src/test/scala/spark/storage/BlockManagerSuite.scala | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index b9c19e61cd..1491818140 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -14,10 +14,12 @@ import spark.util.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { var store: BlockManager = null + var store2: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null var oldArch: String = null var oldOops: String = null + var oldHeartBeat: String = null // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer @@ -29,6 +31,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") oldOops = System.setProperty("spark.test.useCompressedOops", "true") + oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() } @@ -36,6 +39,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT after { if (store != null) { store.stop() + store = null + } + if (store2 != null) { + store2.stop() + store2 = null } actorSystem.shutdown() actorSystem.awaitTermination() @@ -85,6 +93,66 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") } + test("reregistration on heart beat") { + val heartBeat = PrivateMethod[Unit]('heartBeat) + store = new BlockManager(master, serializer, 2000) + val a1 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + master.notifyADeadHost(store.blockManagerId.ip) + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + + store invokePrivate heartBeat() + assert(master.mustGetLocations(GetLocations("a1")).size > 0, + "a1 was not reregistered with master") + } + + test("reregistration on block update") { + store = new BlockManager(master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + master.notifyADeadHost(store.blockManagerId.ip) + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + + store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY) + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, + "a1 was not reregistered with master") + assert(master.mustGetLocations(GetLocations("a2")).size > 0, + "master was not told about a2") + } + + test("deregistration on duplicate") { + val heartBeat = PrivateMethod[Unit]('heartBeat) + store = new BlockManager(master, serializer, 2000) + val a1 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + store2 = new BlockManager(master, serializer, 2000) + + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + + store invokePrivate heartBeat() + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + store2 invokePrivate heartBeat() + + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master") + } + test("in-memory LRU storage") { store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) |