diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 61 |
1 files changed, 51 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6fc32cb30a..a1c2933584 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._ import org.apache.spark._ import org.apache.spark.executor.DataReadMethod -import org.apache.spark.memory.{MemoryMode, StaticMemoryManager} +import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.NettyBlockTransferService @@ -74,10 +74,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE name: String = SparkContext.DRIVER_IDENTIFIER, master: BlockManagerMaster = this.master, transferService: Option[BlockTransferService] = Option.empty): BlockManager = { + conf.set("spark.testing.memory", maxMem.toString) + conf.set("spark.memory.offHeap.size", maxMem.toString) val serializer = new KryoSerializer(conf) val transfer = transferService - .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1)) - val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) + .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)) + val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) @@ -92,6 +94,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE System.setProperty("os.arch", "amd64") conf = new SparkConf(false) .set("spark.app.id", "test") + .set("spark.testing", "true") + .set("spark.memory.fraction", "1") + .set("spark.memory.storageFraction", "1") .set("spark.kryoserializer.buffer", "1m") .set("spark.test.useCompressedOops", "true") .set("spark.storage.unrollFraction", "0.4") @@ -485,7 +490,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockManager = makeBlockManager(128, "exec", bmMaster) val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) - assert(locations.map(_.host) === Seq(localHost, localHost, otherHost)) + assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost)) } test("SPARK-9591: getRemoteBytes from another location when Exception throw") { @@ -510,6 +515,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + test("SPARK-14252: getOrElseUpdate should still read from remote storage") { + store = makeBlockManager(8000, "executor1") + store2 = makeBlockManager(8000, "executor2") + val list1 = List(new Array[Byte](4000)) + store2.putIterator( + "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + assert(store.getOrElseUpdate( + "list1", + StorageLevel.MEMORY_ONLY, + ClassTag.Any, + () => throw new AssertionError("attempted to compute locally")).isLeft) + } + test("in-memory LRU storage") { testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY) } @@ -518,6 +536,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER) } + test("in-memory LRU storage with off-heap") { + testInMemoryLRUStorage(StorageLevel( + useDisk = false, + useMemory = true, + useOffHeap = true, + deserialized = false, replication = 1)) + } + private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = { store = makeBlockManager(12000) val a1 = new Array[Byte](4000) @@ -608,6 +634,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true) } + test("disk and off-heap memory storage") { + testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = false) + } + + test("disk and off-heap memory storage with getLocalBytes") { + testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = true) + } + def testDiskAndMemoryStorage( storageLevel: StorageLevel, getAsBytes: Boolean): Unit = { @@ -817,12 +851,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("block store put failure") { // Use Java serializer so we can create an unserializable error. - val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val memoryManager = new StaticMemoryManager( - conf, - maxOnHeapExecutionMemory = Long.MaxValue, - maxOnHeapStorageMemory = 1200, - numCores = 1) + conf.set("spark.testing.memory", "1200") + val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, @@ -928,6 +959,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(!store.diskStore.contains("list3"), "list3 was in disk store") assert(!store.diskStore.contains("list4"), "list4 was in disk store") assert(!store.diskStore.contains("list5"), "list5 was in disk store") + + // remove block - list2 should be removed from disk + val updatedBlocks6 = getUpdatedBlocks { + store.removeBlock( + "list2", tellMaster = true) + } + assert(updatedBlocks6.size === 1) + assert(updatedBlocks6.head._1 === TestBlockId("list2")) + assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE) + assert(!store.diskStore.contains("list2"), "list2 was in disk store") } test("query block statuses") { |