diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-07 13:47:10 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-06-07 13:47:10 -0700 |
commit | c2c7299d7a4a65c79dc288fe7a398c3a9f67cd91 (patch) | |
tree | f6738bcbbb88d80248a16befd554e261049d56f6 /core | |
parent | 63051dd2bcc4bf09d413ff7cf89a37967edc33ba (diff) | |
download | spark-c2c7299d7a4a65c79dc288fe7a398c3a9f67cd91.tar.gz spark-c2c7299d7a4a65c79dc288fe7a398c3a9f67cd91.tar.bz2 spark-c2c7299d7a4a65c79dc288fe7a398c3a9f67cd91.zip |
Added BlockManagerSuite, which I'd forgotten to merge.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/test/scala/spark/storage/BlockManagerSuite.scala | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala new file mode 100644 index 0000000000..ea7e6ebbb1 --- /dev/null +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -0,0 +1,182 @@ +package spark.storage + +import spark.KryoSerializer + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter + +class BlockManagerSuite extends FunSuite with BeforeAndAfter{ + before { + BlockManagerMaster.startBlockManagerMaster(true, true) + } + + test("in-memory LRU storage") { + val store = new BlockManager(1000, new KryoSerializer) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_DESER) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_DESER) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + Thread.sleep(100) + assert(store.getSingle("a1") == None, "a1 was in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + // At this point a2 was gotten last, so LRU will getSingle rid of a3 + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + Thread.sleep(100) + assert(store.getSingle("a3") == None, "a3 was in store") + } + + test("in-memory LRU storage with serialization") { + val store = new BlockManager(1000, new KryoSerializer) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) + Thread.sleep(100) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1") == None, "a1 was in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + // At this point a2 was gotten last, so LRU will getSingle rid of a3 + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) + Thread.sleep(100) + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") == None, "a1 was in store") + } + + test("on-disk storage") { + val store = new BlockManager(1000, new KryoSerializer) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.DISK_ONLY) + store.putSingle("a2", a2, StorageLevel.DISK_ONLY) + store.putSingle("a3", a3, StorageLevel.DISK_ONLY) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1") != None, "a1 was not in store") + } + + test("disk and memory storage") { + val store = new BlockManager(1000, new KryoSerializer) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.DISK_AND_MEMORY_DESER) + store.putSingle("a2", a2, StorageLevel.DISK_AND_MEMORY_DESER) + store.putSingle("a3", a3, StorageLevel.DISK_AND_MEMORY_DESER) + Thread.sleep(100) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1") != None, "a1 was not in store") + } + + test("disk and memory storage with serialization") { + val store = new BlockManager(1000, new KryoSerializer) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.DISK_AND_MEMORY) + store.putSingle("a2", a2, StorageLevel.DISK_AND_MEMORY) + store.putSingle("a3", a3, StorageLevel.DISK_AND_MEMORY) + Thread.sleep(100) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1") != None, "a1 was not in store") + } + + test("LRU with mixed storage levels") { + val store = new BlockManager(1000, new KryoSerializer) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + val a4 = new Array[Byte](400) + // First store a1 and a2, both in memory, and a3, on disk only + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.DISK_ONLY) + // At this point LRU should not kick in because a3 is only on disk + assert(store.getSingle("a1") != None, "a2 was not in store") + assert(store.getSingle("a2") != None, "a3 was not in store") + assert(store.getSingle("a3") != None, "a1 was not in store") + assert(store.getSingle("a1") != None, "a2 was not in store") + assert(store.getSingle("a2") != None, "a3 was not in store") + assert(store.getSingle("a3") != None, "a1 was not in store") + // Now let's add in a4, which uses both disk and memory; a1 should drop out + store.putSingle("a4", a4, StorageLevel.DISK_AND_MEMORY) + Thread.sleep(100) + assert(store.getSingle("a1") == None, "a1 was in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a4") != None, "a4 was not in store") + } + + test("in-memory LRU with streams") { + val store = new BlockManager(1000, new KryoSerializer) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list2 = List(new Array[Byte](200), new Array[Byte](200)) + val list3 = List(new Array[Byte](200), new Array[Byte](200)) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_DESER) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_DESER) + store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY_DESER) + Thread.sleep(100) + assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").get.size == 2) + assert(store.get("list3") != None, "list3 was not in store") + assert(store.get("list3").get.size == 2) + assert(store.get("list1") == None, "list1 was in store") + assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").get.size == 2) + // At this point list2 was gotten last, so LRU will getSingle rid of list3 + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_DESER) + Thread.sleep(100) + assert(store.get("list1") != None, "list1 was not in store") + assert(store.get("list1").get.size == 2) + assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").get.size == 2) + assert(store.get("list3") == None, "list1 was in store") + } + + test("LRU with mixed storage levels and streams") { + val store = new BlockManager(1000, new KryoSerializer) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list2 = List(new Array[Byte](200), new Array[Byte](200)) + val list3 = List(new Array[Byte](200), new Array[Byte](200)) + val list4 = List(new Array[Byte](200), new Array[Byte](200)) + // First store list1 and list2, both in memory, and list3, on disk only + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY) + store.put("list3", list3.iterator, StorageLevel.DISK_ONLY) + Thread.sleep(100) + // At this point LRU should not kick in because list3 is only on disk + assert(store.get("list1") != None, "list2 was not in store") + assert(store.get("list1").get.size == 2) + assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").get.size == 2) + assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").get.size == 2) + assert(store.get("list1") != None, "list2 was not in store") + assert(store.get("list1").get.size == 2) + assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").get.size == 2) + assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").get.size == 2) + // Now let's add in list4, which uses both disk and memory; list1 should drop out + store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY) + assert(store.get("list1") == None, "list1 was in store") + assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").get.size == 2) + assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").get.size == 2) + assert(store.get("list4") != None, "list4 was not in store") + assert(store.get("list4").get.size == 2) + } +} |