aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala30
3 files changed, 26 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index dcf359e3c2..b38e2ec57f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -495,10 +495,9 @@ private[spark] class BlockManager(
}
} else {
// Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asBlockResult) {
+ if (!level.deserialized && !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
- * "memory serialized", or if it should be cached as objects in memory
- * but we only requested its serialized bytes. */
+ * "memory serialized" and we requested its serialized bytes. */
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 12b70d1807..bb72fe4bca 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -88,6 +88,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
@@ -106,6 +107,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* The caller should guarantee that `size` is correct.
*/
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
@@ -118,6 +120,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel): Either[Iterator[Any], Long] = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
putIterator(blockId, values, level, allowPersistToDisk = true)
}
@@ -138,6 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
values: Iterator[Any],
level: StorageLevel,
allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cfcbf1745d..0485b0501c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -585,36 +585,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("disk and memory storage") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = false)
}
test("disk and memory storage with getLocalBytes") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = true)
}
test("disk and memory storage with serialization") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = false)
}
test("disk and memory storage with serialization and getLocalBytes") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true)
}
def testDiskAndMemoryStorage(
storageLevel: StorageLevel,
- accessMethod: BlockManager => BlockId => Option[_]): Unit = {
+ getAsBytes: Boolean): Unit = {
store = makeBlockManager(12000)
+ val accessMethod =
+ if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, storageLevel)
store.putSingle("a2", a2, storageLevel)
store.putSingle("a3", a3, storageLevel)
- assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
- assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
- assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
- assert(accessMethod(store)("a1").isDefined, "a1 was not in store")
- assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
+ assert(accessMethod("a2").isDefined, "a2 was not in store")
+ assert(accessMethod("a3").isDefined, "a3 was not in store")
+ assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
+ assert(accessMethod("a1").isDefined, "a1 was not in store")
+ val dataShouldHaveBeenCachedBackIntoMemory = {
+ if (storageLevel.deserialized) !getAsBytes
+ else getAsBytes
+ }
+ if (dataShouldHaveBeenCachedBackIntoMemory) {
+ assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
+ } else {
+ assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
+ }
}
test("LRU with mixed storage levels") {