aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-31 01:48:16 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-31 01:48:16 -0700
commitba5e544461e8ca9216af703033f6b0de6dbc56ec (patch)
tree0046dd625fa30e6895f6b72aba5ef47b526389b9 /core/src/test/scala
parentf6ad3781b1d9a044789f114d13787b9d05223da3 (diff)
downloadspark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.tar.gz
spark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.tar.bz2
spark-ba5e544461e8ca9216af703033f6b0de6dbc56ec.zip
More block manager cleanup.
Implemented a removeRdd method in BlockManager, and use that to implement RDD.unpersist. Previously, unpersist needs to send B akka messages, where B = number of blocks. Now unpersist only needs to send W akka messages, where W = the number of workers.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala36
1 files changed, 22 insertions, 14 deletions
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index bff2475686..b9d5f9668e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -15,10 +15,10 @@ import org.scalatest.time.SpanSugar._
import spark.JavaSerializer
import spark.KryoSerializer
import spark.SizeEstimator
-import spark.Utils
import spark.util.AkkaUtils
import spark.util.ByteBufferInputStream
+
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
var store2: BlockManager = null
@@ -124,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
@@ -170,7 +170,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
@@ -218,7 +218,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
- master.removeRdd(0)
+ master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
store.getSingle("rdd_0_0") should be (None)
@@ -232,6 +232,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.getSingle("nonrddblock") should not be (None)
master.getLocations("nonrddblock") should have size (1)
}
+
+ store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+ master.removeRdd(0, blocking = true)
+ store.getSingle("rdd_0_0") should be (None)
+ master.getLocations("rdd_0_0") should have size 0
+ store.getSingle("rdd_0_1") should be (None)
+ master.getLocations("rdd_0_1") should have size 0
}
test("reregistration on heart beat") {
@@ -262,7 +270,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -280,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
@@ -490,9 +498,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -501,7 +509,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -516,9 +524,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list3 = List(new Array[Byte](200), new Array[Byte](200))
val list4 = List(new Array[Byte](200), new Array[Byte](200))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -533,7 +541,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)