aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-02 17:25:38 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-02 17:25:38 -0700
commit6098f7e87a88d0b847c402b95510cb07352db643 (patch)
tree5aeee5022239a27ae3d7599099148bbe3b6d401b /core/src/test/scala
parent6112b1a83c99b4acc292c1e6280abfff01180ba5 (diff)
downloadspark-6098f7e87a88d0b847c402b95510cb07352db643.tar.gz
spark-6098f7e87a88d0b847c402b95510cb07352db643.tar.bz2
spark-6098f7e87a88d0b847c402b95510cb07352db643.zip
Fixed cache replacement behavior of BlockManager:
- Partitions that get dropped to disk will now be loaded back into RAM after they're accessed again - Same-RDD rule for cache replacement is now implemented (don't drop partitions from an RDD to make room for other partitions from itself) - Items stored as MEMORY_AND_DISK go into memory only first, instead of being eagerly written out to disk - MemoryStore.ensureFreeSpace is called within a lock on the writer thread to prevent race conditions (this can still be optimized to allow multiple concurrent calls to it but it's a start) - MemoryStore does not accept blocks larger than its limit
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala89
1 files changed, 83 insertions, 6 deletions
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index d15d7285a7..f6b8b49bff 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
- // Setting storage level of a1 and a2 to invalid; they should be removed from store and master
- store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
- store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
+ // Drop a1 and a2 from memory; this should be reported back to the master
+ store.dropFromMemory("a1", null)
+ store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
@@ -113,13 +113,58 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
Thread.sleep(100)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") === None, "a1 was in store")
+ assert(store.getSingle("a3") === None, "a3 was in store")
}
-
+
+ test("in-memory LRU for partitions of same RDD") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
+ Thread.sleep(100)
+ // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
+ // from the same RDD
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store")
+ // Check that rdd_0_3 doesn't replace them even after further accesses
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ }
+
+ test("in-memory LRU for partitions of multiple RDDs") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ Thread.sleep(100)
+ // At this point rdd_1_1 should've replaced rdd_0_1
+ assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ // Do a get() on rdd_0_2 so that it is the most recently used item
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ // Put in more partitions from RDD 0; they should replace rdd_1_1
+ store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ Thread.sleep(100)
+ // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
+ // when we try to add rdd_0_4.
+ assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store")
+ }
+
test("on-disk storage") {
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
@@ -149,6 +194,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
}
+ test("disk and memory storage with getLocalBytes") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
+ Thread.sleep(100)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
test("disk and memory storage with serialization") {
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)
@@ -165,6 +226,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
}
+ test("disk and memory storage with serialization and getLocalBytes") {
+ val store = new BlockManager(master, new KryoSerializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
+ Thread.sleep(100)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
test("LRU with mixed storage levels") {
val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400)