diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-02 17:25:38 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-02 17:25:38 -0700 |
commit | 6098f7e87a88d0b847c402b95510cb07352db643 (patch) | |
tree | 5aeee5022239a27ae3d7599099148bbe3b6d401b /core/src/test/scala | |
parent | 6112b1a83c99b4acc292c1e6280abfff01180ba5 (diff) | |
download | spark-6098f7e87a88d0b847c402b95510cb07352db643.tar.gz spark-6098f7e87a88d0b847c402b95510cb07352db643.tar.bz2 spark-6098f7e87a88d0b847c402b95510cb07352db643.zip |
Fixed cache replacement behavior of BlockManager:
- Partitions that get dropped to disk will now be loaded back into RAM
after they're accessed again
- Same-RDD rule for cache replacement is now implemented (don't drop
partitions from an RDD to make room for other partitions from itself)
- Items stored as MEMORY_AND_DISK go into memory only first, instead of
being eagerly written out to disk
- MemoryStore.ensureFreeSpace is called within a lock on the writer
thread to prevent race conditions (this can still be optimized to
allow multiple concurrent calls to it but it's a start)
- MemoryStore does not accept blocks larger than its limit
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/storage/BlockManagerSuite.scala | 89 |
1 files changed, 83 insertions, 6 deletions
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index d15d7285a7..f6b8b49bff 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") - // Setting storage level of a1 and a2 to invalid; they should be removed from store and master - store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1)) - store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0)) + // Drop a1 and a2 from memory; this should be reported back to the master + store.dropFromMemory("a1", null) + store.dropFromMemory("a2", null) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") @@ -113,13 +113,58 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) Thread.sleep(100) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") === None, "a1 was in store") + assert(store.getSingle("a3") === None, "a3 was in store") } - + + test("in-memory LRU for partitions of same RDD") { + val store = new BlockManager(master, new KryoSerializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) + Thread.sleep(100) + // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2 + // from the same RDD + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store") + // Check that rdd_0_3 doesn't replace them even after further accesses + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + } + + test("in-memory LRU for partitions of multiple RDDs") { + val store = new BlockManager(master, new KryoSerializer, 1200) + store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + Thread.sleep(100) + // At this point rdd_1_1 should've replaced rdd_0_1 + assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + // Do a get() on rdd_0_2 so that it is the most recently used item + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + // Put in more partitions from RDD 0; they should replace rdd_1_1 + store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + Thread.sleep(100) + // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped + // when we try to add rdd_0_4. + assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store") + } + test("on-disk storage") { val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) @@ -149,6 +194,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") } + test("disk and memory storage with getLocalBytes") { + val store = new BlockManager(master, new KryoSerializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) + Thread.sleep(100) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + test("disk and memory storage with serialization") { val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) @@ -165,6 +226,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") } + test("disk and memory storage with serialization and getLocalBytes") { + val store = new BlockManager(master, new KryoSerializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) + Thread.sleep(100) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + test("LRU with mixed storage levels") { val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) |