diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 666 |
1 files changed, 666 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala new file mode 100644 index 0000000000..88ba10f2f2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -0,0 +1,666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.nio.ByteBuffer + +import akka.actor._ + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.matchers.ShouldMatchers._ +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.JavaSerializer +import org.apache.spark.KryoSerializer +import org.apache.spark.SizeEstimator +import org.apache.spark.Utils +import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.ByteBufferInputStream + + +class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { + var store: BlockManager = null + var store2: BlockManager = null + var actorSystem: ActorSystem = null + var master: BlockManagerMaster = null + var oldArch: String = null + var oldOops: String = null + var oldHeartBeat: String = null + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + System.setProperty("spark.kryoserializer.buffer.mb", "1") + val serializer = new KryoSerializer + + before { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0) + this.actorSystem = actorSystem + System.setProperty("spark.driver.port", boundPort.toString) + System.setProperty("spark.hostPort", "localhost:" + boundPort) + + master = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true)))) + + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + oldArch = System.setProperty("os.arch", "amd64") + oldOops = System.setProperty("spark.test.useCompressedOops", "true") + oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true") + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + // Set some value ... + System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111) + } + + after { + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + + if (store != null) { + store.stop() + store = null + } + if (store2 != null) { + store2.stop() + store2 = null + } + actorSystem.shutdown() + actorSystem.awaitTermination() + actorSystem = null + master = null + + if (oldArch != null) { + System.setProperty("os.arch", oldArch) + } else { + System.clearProperty("os.arch") + } + + if (oldOops != null) { + System.setProperty("spark.test.useCompressedOops", oldOops) + } else { + System.clearProperty("spark.test.useCompressedOops") + } + } + + test("StorageLevel object caching") { + val level1 = StorageLevel(false, false, false, 3) + val level2 = StorageLevel(false, false, false, 3) // this should return the same object as level1 + val level3 = StorageLevel(false, false, false, 2) // this should return a different object + assert(level2 === level1, "level2 is not same as level1") + assert(level2.eq(level1), "level2 is not the same object as level1") + assert(level3 != level1, "level3 is same as level1") + val bytes1 = Utils.serialize(level1) + val level1_ = Utils.deserialize[StorageLevel](bytes1) + val bytes2 = Utils.serialize(level2) + val level2_ = Utils.deserialize[StorageLevel](bytes2) + assert(level1_ === level1, "Deserialized level1 not same as original level1") + assert(level1_.eq(level1), "Deserialized level1 not the same object as original level2") + assert(level2_ === level2, "Deserialized level2 not same as original level2") + assert(level2_.eq(level1), "Deserialized level2 not the same object as original level1") + } + + test("BlockManagerId object caching") { + val id1 = BlockManagerId("e1", "XXX", 1, 0) + val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1 + val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object + assert(id2 === id1, "id2 is not same as id1") + assert(id2.eq(id1), "id2 is not the same object as id1") + assert(id3 != id1, "id3 is same as id1") + val bytes1 = Utils.serialize(id1) + val id1_ = Utils.deserialize[BlockManagerId](bytes1) + val bytes2 = Utils.serialize(id2) + val id2_ = Utils.deserialize[BlockManagerId](bytes2) + assert(id1_ === id1, "Deserialized id1 is not same as original id1") + assert(id1_.eq(id1), "Deserialized id1 is not the same object as original id1") + assert(id2_ === id2, "Deserialized id2 is not same as original id2") + assert(id2_.eq(id1), "Deserialized id2 is not the same object as original id1") + } + + test("master + 1 manager interaction") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + + // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) + + // Checking whether blocks are in memory + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + + // Checking whether master knows about the blocks or not + assert(master.getLocations("a1").size > 0, "master was not told about a1") + assert(master.getLocations("a2").size > 0, "master was not told about a2") + assert(master.getLocations("a3").size === 0, "master was told about a3") + + // Drop a1 and a2 from memory; this should be reported back to the master + store.dropFromMemory("a1", null) + store.dropFromMemory("a2", null) + assert(store.getSingle("a1") === None, "a1 not removed from store") + assert(store.getSingle("a2") === None, "a2 not removed from store") + assert(master.getLocations("a1").size === 0, "master did not remove a1") + assert(master.getLocations("a2").size === 0, "master did not remove a2") + } + + test("master + 2 managers interaction") { + store = new BlockManager("exec1", actorSystem, master, serializer, 2000) + store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000) + + val peers = master.getPeers(store.blockManagerId, 1) + assert(peers.size === 1, "master did not return the other manager as a peer") + assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager") + + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) + store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) + assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") + assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") + } + + test("removing block") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + + // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 + store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) + + // Checking whether blocks are in memory and memory size + val memStatus = master.getMemoryStatus.head._2 + assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") + assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200") + assert(store.getSingle("a1-to-remove") != None, "a1 was not in store") + assert(store.getSingle("a2-to-remove") != None, "a2 was not in store") + assert(store.getSingle("a3-to-remove") != None, "a3 was not in store") + + // Checking whether master knows about the blocks or not + assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1") + assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2") + assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3") + + // Remove a1 and a2 and a3. Should be no-op for a3. + master.removeBlock("a1-to-remove") + master.removeBlock("a2-to-remove") + master.removeBlock("a3-to-remove") + + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("a1-to-remove") should be (None) + master.getLocations("a1-to-remove") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("a2-to-remove") should be (None) + master.getLocations("a2-to-remove") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("a3-to-remove") should not be (None) + master.getLocations("a3-to-remove") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + val memStatus = master.getMemoryStatus.head._2 + memStatus._1 should equal (2000L) + memStatus._2 should equal (2000L) + } + } + + test("removing rdd") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + // Putting a1, a2 and a3 in memory. + store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) + master.removeRdd(0, blocking = false) + + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_0") should be (None) + master.getLocations("rdd_0_0") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_1") should be (None) + master.getLocations("rdd_0_1") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("nonrddblock") should not be (None) + master.getLocations("nonrddblock") should have size (1) + } + + store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + master.removeRdd(0, blocking = true) + store.getSingle("rdd_0_0") should be (None) + master.getLocations("rdd_0_0") should have size 0 + store.getSingle("rdd_0_1") should be (None) + master.getLocations("rdd_0_1") should have size 0 + } + + test("reregistration on heart beat") { + val heartBeat = PrivateMethod[Unit]('heartBeat) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(master.getLocations("a1").size > 0, "master was not told about a1") + + master.removeExecutor(store.blockManagerId.executorId) + assert(master.getLocations("a1").size == 0, "a1 was not removed from master") + + store invokePrivate heartBeat() + assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") + } + + test("reregistration on block update") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + assert(master.getLocations("a1").size > 0, "master was not told about a1") + + master.removeExecutor(store.blockManagerId.executorId) + assert(master.getLocations("a1").size == 0, "a1 was not removed from master") + + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.waitForAsyncReregister() + + assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") + assert(master.getLocations("a2").size > 0, "master was not told about a2") + } + + test("reregistration doesn't dead lock") { + val heartBeat = PrivateMethod[Unit]('heartBeat) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = List(new Array[Byte](400)) + + // try many times to trigger any deadlocks + for (i <- 1 to 100) { + master.removeExecutor(store.blockManagerId.executorId) + val t1 = new Thread { + override def run() { + store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + } + val t2 = new Thread { + override def run() { + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + } + } + val t3 = new Thread { + override def run() { + store invokePrivate heartBeat() + } + } + + t1.start() + t2.start() + t3.start() + t1.join() + t2.join() + t3.join() + + store.dropFromMemory("a1", null) + store.dropFromMemory("a2", null) + store.waitForAsyncReregister() + } + } + + test("in-memory LRU storage") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1") === None, "a1 was in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + // At this point a2 was gotten last, so LRU will getSingle rid of a3 + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") === None, "a3 was in store") + } + + test("in-memory LRU storage with serialization") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a1") === None, "a1 was in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + // At this point a2 was gotten last, so LRU will getSingle rid of a3 + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") === None, "a3 was in store") + } + + test("in-memory LRU for partitions of same RDD") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) + // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2 + // from the same RDD + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store") + // Check that rdd_0_3 doesn't replace them even after further accesses + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + } + + test("in-memory LRU for partitions of multiple RDDs") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // At this point rdd_1_1 should've replaced rdd_0_1 + assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + // Do a get() on rdd_0_2 so that it is the most recently used item + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + // Put in more partitions from RDD 0; they should replace rdd_1_1 + store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped + // when we try to add rdd_0_4. + assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store") + } + + test("on-disk storage") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.DISK_ONLY) + store.putSingle("a2", a2, StorageLevel.DISK_ONLY) + store.putSingle("a3", a3, StorageLevel.DISK_ONLY) + assert(store.getSingle("a2") != None, "a2 was in store") + assert(store.getSingle("a3") != None, "a3 was in store") + assert(store.getSingle("a1") != None, "a1 was in store") + } + + test("disk and memory storage") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + + test("disk and memory storage with getLocalBytes") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + + test("disk and memory storage with serialization") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + + test("disk and memory storage with serialization and getLocalBytes") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + + test("LRU with mixed storage levels") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + val a4 = new Array[Byte](400) + // First store a1 and a2, both in memory, and a3, on disk only + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) + store.putSingle("a3", a3, StorageLevel.DISK_ONLY) + // At this point LRU should not kick in because a3 is only on disk + assert(store.getSingle("a1") != None, "a2 was not in store") + assert(store.getSingle("a2") != None, "a3 was not in store") + assert(store.getSingle("a3") != None, "a1 was not in store") + assert(store.getSingle("a1") != None, "a2 was not in store") + assert(store.getSingle("a2") != None, "a3 was not in store") + assert(store.getSingle("a3") != None, "a1 was not in store") + // Now let's add in a4, which uses both disk and memory; a1 should drop out + store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) + assert(store.getSingle("a1") == None, "a1 was in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(store.getSingle("a4") != None, "a4 was not in store") + } + + test("in-memory LRU with streams") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list2 = List(new Array[Byte](200), new Array[Byte](200)) + val list3 = List(new Array[Byte](200), new Array[Byte](200)) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").get.size == 2) + assert(store.get("list3") != None, "list3 was not in store") + assert(store.get("list3").get.size == 2) + assert(store.get("list1") === None, "list1 was in store") + assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").get.size == 2) + // At this point list2 was gotten last, so LRU will getSingle rid of list3 + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(store.get("list1") != None, "list1 was not in store") + assert(store.get("list1").get.size == 2) + assert(store.get("list2") != None, "list2 was not in store") + assert(store.get("list2").get.size == 2) + assert(store.get("list3") === None, "list1 was in store") + } + + test("LRU with mixed storage levels and streams") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list2 = List(new Array[Byte](200), new Array[Byte](200)) + val list3 = List(new Array[Byte](200), new Array[Byte](200)) + val list4 = List(new Array[Byte](200), new Array[Byte](200)) + // First store list1 and list2, both in memory, and list3, on disk only + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) + store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + // At this point LRU should not kick in because list3 is only on disk + assert(store.get("list1") != None, "list2 was not in store") + assert(store.get("list1").get.size === 2) + assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").get.size === 2) + assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").get.size === 2) + assert(store.get("list1") != None, "list2 was not in store") + assert(store.get("list1").get.size === 2) + assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").get.size === 2) + assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").get.size === 2) + // Now let's add in list4, which uses both disk and memory; list1 should drop out + store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) + assert(store.get("list1") === None, "list1 was in store") + assert(store.get("list2") != None, "list3 was not in store") + assert(store.get("list2").get.size === 2) + assert(store.get("list3") != None, "list1 was not in store") + assert(store.get("list3").get.size === 2) + assert(store.get("list4") != None, "list4 was not in store") + assert(store.get("list4").get.size === 2) + } + + test("negative byte values in ByteBufferInputStream") { + val buffer = ByteBuffer.wrap(Array[Int](254, 255, 0, 1, 2).map(_.toByte).toArray) + val stream = new ByteBufferInputStream(buffer) + val temp = new Array[Byte](10) + assert(stream.read() === 254, "unexpected byte read") + assert(stream.read() === 255, "unexpected byte read") + assert(stream.read() === 0, "unexpected byte read") + assert(stream.read(temp, 0, temp.length) === 2, "unexpected number of bytes read") + assert(stream.read() === -1, "end of stream not signalled") + assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled") + } + + test("overly large block") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 500) + store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + assert(store.getSingle("a1") === None, "a1 was in store") + store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) + assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store") + assert(store.getSingle("a2") != None, "a2 was not in store") + } + + test("block compression") { + try { + System.setProperty("spark.shuffle.compress", "true") + store = new BlockManager("exec1", actorSystem, master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.shuffle.compress", "false") + store = new BlockManager("exec2", actorSystem, master, serializer, 2000) + store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "true") + store = new BlockManager("exec3", actorSystem, master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.broadcast.compress", "false") + store = new BlockManager("exec4", actorSystem, master, serializer, 2000) + store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "true") + store = new BlockManager("exec5", actorSystem, master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") + store.stop() + store = null + + System.setProperty("spark.rdd.compress", "false") + store = new BlockManager("exec6", actorSystem, master, serializer, 2000) + store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) + assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") + store.stop() + store = null + + // Check that any other block types are also kept uncompressed + store = new BlockManager("exec7", actorSystem, master, serializer, 2000) + store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) + assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") + store.stop() + store = null + } finally { + System.clearProperty("spark.shuffle.compress") + System.clearProperty("spark.broadcast.compress") + System.clearProperty("spark.rdd.compress") + } + } + + test("block store put failure") { + // Use Java serializer so we can create an unserializable error. + store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200) + + // The put should fail since a1 is not serializable. + class UnserializableClass + val a1 = new UnserializableClass + intercept[java.io.NotSerializableException] { + store.putSingle("a1", a1, StorageLevel.DISK_ONLY) + } + + // Make sure get a1 doesn't hang and returns None. + failAfter(1 second) { + assert(store.getSingle("a1") == None, "a1 should not be in store") + } + } +} |