aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-08 10:40:27 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-03-08 10:40:27 -0800
commitad3c9a9730535faa6d270ee83412f79ec3db8333 (patch)
tree529e1b79fad5e263068528ae67b22443e69f2bc8 /core
parent78d3b6051eea1b5fa9b1a4caa186214b8cca17bf (diff)
downloadspark-ad3c9a9730535faa6d270ee83412f79ec3db8333.tar.gz
spark-ad3c9a9730535faa6d270ee83412f79ec3db8333.tar.bz2
spark-ad3c9a9730535faa6d270ee83412f79ec3db8333.zip
[SPARK-13695] Don't cache MEMORY_AND_DISK blocks as bytes in memory after spills
When a cached block is spilled to disk and read back in serialized form (i.e. as bytes), the current BlockManager implementation will attempt to re-insert the serialized block into the MemoryStore even if the block's storage level requests deserialized caching. This behavior adds some complexity to the MemoryStore but I don't think it offers many performance benefits and I'd like to remove it in order to simplify a larger refactoring patch. Therefore, this patch changes the behavior so that disk store reads will only cache bytes in the memory store for blocks with serialized storage levels. There are two places where we request serialized bytes from the BlockStore: 1. getLocalBytes(), which is only called when reading local copies of TorrentBroadcast pieces. Broadcast pieces are always cached using a serialized storage level, so this won't lead to a mismatch in serialization forms if spilled bytes read from disk are cached as bytes in the memory store. 2. the non-shuffle-block branch in getBlockData(), which is only called by the NettyBlockRpcServer when responding to requests to read remote blocks. Caching the serialized bytes in memory will only benefit us if those cached bytes are read before they're evicted and the likelihood of that happening seems low since the frequency of remote reads of non-broadcast cached blocks seems very low. Caching these bytes when they have a low probability of being read is bad if it risks the eviction of blocks which are cached in their expected serialized/deserialized forms, since those blocks seem more likely to be read in local computation. Given the argument above, I think this change is unlikely to cause performance regressions. Author: Josh Rosen <joshrosen@databricks.com> Closes #11533 from JoshRosen/remove-memorystore-level-mismatch.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala30
3 files changed, 26 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index dcf359e3c2..b38e2ec57f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -495,10 +495,9 @@ private[spark] class BlockManager(
}
} else {
// Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asBlockResult) {
+ if (!level.deserialized && !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
- * "memory serialized", or if it should be cached as objects in memory
- * but we only requested its serialized bytes. */
+ * "memory serialized" and we requested its serialized bytes. */
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 12b70d1807..bb72fe4bca 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -88,6 +88,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
@@ -106,6 +107,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* The caller should guarantee that `size` is correct.
*/
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
@@ -118,6 +120,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel): Either[Iterator[Any], Long] = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
putIterator(blockId, values, level, allowPersistToDisk = true)
}
@@ -138,6 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
values: Iterator[Any],
level: StorageLevel,
allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
+ require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cfcbf1745d..0485b0501c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -585,36 +585,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("disk and memory storage") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = false)
}
test("disk and memory storage with getLocalBytes") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = true)
}
test("disk and memory storage with serialization") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = false)
}
test("disk and memory storage with serialization and getLocalBytes") {
- testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock)
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true)
}
def testDiskAndMemoryStorage(
storageLevel: StorageLevel,
- accessMethod: BlockManager => BlockId => Option[_]): Unit = {
+ getAsBytes: Boolean): Unit = {
store = makeBlockManager(12000)
+ val accessMethod =
+ if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, storageLevel)
store.putSingle("a2", a2, storageLevel)
store.putSingle("a3", a3, storageLevel)
- assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
- assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
- assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
- assert(accessMethod(store)("a1").isDefined, "a1 was not in store")
- assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
+ assert(accessMethod("a2").isDefined, "a2 was not in store")
+ assert(accessMethod("a3").isDefined, "a3 was not in store")
+ assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
+ assert(accessMethod("a1").isDefined, "a1 was not in store")
+ val dataShouldHaveBeenCachedBackIntoMemory = {
+ if (storageLevel.deserialized) !getAsBytes
+ else getAsBytes
+ }
+ if (dataShouldHaveBeenCachedBackIntoMemory) {
+ assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
+ } else {
+ assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
+ }
}
test("LRU with mixed storage levels") {