aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala61
1 files changed, 51 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6fc32cb30a..a1c2933584 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
+import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -74,10 +74,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
name: String = SparkContext.DRIVER_IDENTIFIER,
master: BlockManagerMaster = this.master,
transferService: Option[BlockTransferService] = Option.empty): BlockManager = {
+ conf.set("spark.testing.memory", maxMem.toString)
+ conf.set("spark.memory.offHeap.size", maxMem.toString)
val serializer = new KryoSerializer(conf)
val transfer = transferService
- .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
- val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
+ .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1))
+ val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
@@ -92,6 +94,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
System.setProperty("os.arch", "amd64")
conf = new SparkConf(false)
.set("spark.app.id", "test")
+ .set("spark.testing", "true")
+ .set("spark.memory.fraction", "1")
+ .set("spark.memory.storageFraction", "1")
.set("spark.kryoserializer.buffer", "1m")
.set("spark.test.useCompressedOops", "true")
.set("spark.storage.unrollFraction", "0.4")
@@ -485,7 +490,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockManager = makeBlockManager(128, "exec", bmMaster)
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
- assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+ assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
}
test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
@@ -510,6 +515,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}
+ test("SPARK-14252: getOrElseUpdate should still read from remote storage") {
+ store = makeBlockManager(8000, "executor1")
+ store2 = makeBlockManager(8000, "executor2")
+ val list1 = List(new Array[Byte](4000))
+ store2.putIterator(
+ "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(store.getOrElseUpdate(
+ "list1",
+ StorageLevel.MEMORY_ONLY,
+ ClassTag.Any,
+ () => throw new AssertionError("attempted to compute locally")).isLeft)
+ }
+
test("in-memory LRU storage") {
testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY)
}
@@ -518,6 +536,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER)
}
+ test("in-memory LRU storage with off-heap") {
+ testInMemoryLRUStorage(StorageLevel(
+ useDisk = false,
+ useMemory = true,
+ useOffHeap = true,
+ deserialized = false, replication = 1))
+ }
+
private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
@@ -608,6 +634,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true)
}
+ test("disk and off-heap memory storage") {
+ testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = false)
+ }
+
+ test("disk and off-heap memory storage with getLocalBytes") {
+ testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = true)
+ }
+
def testDiskAndMemoryStorage(
storageLevel: StorageLevel,
getAsBytes: Boolean): Unit = {
@@ -817,12 +851,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val memoryManager = new StaticMemoryManager(
- conf,
- maxOnHeapExecutionMemory = Long.MaxValue,
- maxOnHeapStorageMemory = 1200,
- numCores = 1)
+ conf.set("spark.testing.memory", "1200")
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
+ val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
@@ -928,6 +959,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!store.diskStore.contains("list3"), "list3 was in disk store")
assert(!store.diskStore.contains("list4"), "list4 was in disk store")
assert(!store.diskStore.contains("list5"), "list5 was in disk store")
+
+ // remove block - list2 should be removed from disk
+ val updatedBlocks6 = getUpdatedBlocks {
+ store.removeBlock(
+ "list2", tellMaster = true)
+ }
+ assert(updatedBlocks6.size === 1)
+ assert(updatedBlocks6.head._1 === TestBlockId("list2"))
+ assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
+ assert(!store.diskStore.contains("list2"), "list2 was in disk store")
}
test("query block statuses") {