aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-03-25 12:09:30 -0700
committerAndrew Or <andrew@databricks.com>2015-03-25 12:17:18 -0700
commit883b7e9030e1a3948acee17608e51dcd9f4d55e1 (patch)
tree7930b43272ade5c6b6561eaa7d464c2c2607a165 /core
parent968408b345a0e26f7ee9105a6a0c3456cf10576a (diff)
downloadspark-883b7e9030e1a3948acee17608e51dcd9f4d55e1.tar.gz
spark-883b7e9030e1a3948acee17608e51dcd9f4d55e1.tar.bz2
spark-883b7e9030e1a3948acee17608e51dcd9f4d55e1.zip
[SPARK-6076][Block Manager] Fix a potential OOM issue when StorageLevel is MEMORY_AND_DISK_SER
In https://github.com/apache/spark/blob/dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L538 , when StorageLevel is `MEMORY_AND_DISK_SER`, it will copy the content from file into memory, then put it into MemoryStore. ```scala val copyForMemory = ByteBuffer.allocate(bytes.limit) copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() ``` However, if the file is bigger than the free memory, OOM will happen. A better approach is testing if there is enough memory. If not, copyForMemory should not be created, since this is an optional operation. Author: zsxwing <zsxwing@gmail.com> Closes #4827 from zsxwing/SPARK-6076 and squashes the following commits: 7d25545 [zsxwing] Add alias for tryToPut and dropFromMemory 1100a54 [zsxwing] Replace call-by-name with () => T 0cc0257 [zsxwing] Fix a potential OOM issue when StorageLevel is MEMORY_AND_DISK_SER
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala43
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala34
3 files changed, 85 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 80d66e5913..1dff09a75d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -535,9 +535,14 @@ private[spark] class BlockManager(
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- memoryStore.putBytes(blockId, copyForMemory, level)
+ memoryStore.putBytes(blockId, bytes.limit, () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we cannot
+ // put it into MemoryStore, copyForMemory should not be created. That's why this
+ // action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ })
bytes.rewind()
}
if (!asBlockResult) {
@@ -991,15 +996,23 @@ private[spark] class BlockManager(
putIterator(blockId, Iterator(value), level, tellMaster)
}
+ def dropFromMemory(
+ blockId: BlockId,
+ data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+ dropFromMemory(blockId, () => data)
+ }
+
/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
+ * If `data` is not put on disk, it won't be created.
+ *
* Return the block status if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
- data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+ data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
@@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
- data match {
+ data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 1be860aea6..ed609772e6 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -98,6 +98,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
+ /**
+ * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
+ * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
+ *
+ * The caller should guarantee that `size` is correct.
+ */
+ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+ // Work on a duplicate - since the original input might be used elsewhere.
+ lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
+ val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
+ val data =
+ if (putAttempt.success) {
+ assert(bytes.limit == size)
+ Right(bytes.duplicate())
+ } else {
+ null
+ }
+ PutResult(size, data, putAttempt.droppedBlocks)
+ }
+
override def putArray(
blockId: BlockId,
values: Array[Any],
@@ -312,11 +332,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
blockId.asRDDId.map(_.rddId)
}
+ private def tryToPut(
+ blockId: BlockId,
+ value: Any,
+ size: Long,
+ deserialized: Boolean): ResultWithDroppedBlocks = {
+ tryToPut(blockId, () => value, size, deserialized)
+ }
+
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
+ * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
+ * created to avoid OOM since it may be a big ByteBuffer.
+ *
* Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
@@ -326,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*/
private def tryToPut(
blockId: BlockId,
- value: Any,
+ value: () => Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {
@@ -345,7 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
droppedBlocks ++= freeSpaceResult.droppedBlocks
if (enoughFreeSpace) {
- val entry = new MemoryEntry(value, size, deserialized)
+ val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
@@ -357,12 +388,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
- val data = if (deserialized) {
- Left(value.asInstanceOf[Array[Any]])
+ lazy val data = if (deserialized) {
+ Left(value().asInstanceOf[Array[Any]])
} else {
- Right(value.asInstanceOf[ByteBuffer].duplicate())
+ Right(value().asInstanceOf[ByteBuffer].duplicate())
}
- val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+ val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
// Release the unroll memory used because we no longer need the underlying Array
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 3fdbe99b5d..ecd1cba5b5 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -170,8 +170,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
- store.dropFromMemory("a1", null)
- store.dropFromMemory("a2", null)
+ store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -413,8 +413,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
t2.join()
t3.join()
- store.dropFromMemory("a1", null)
- store.dropFromMemory("a2", null)
+ store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
store.waitForAsyncReregister()
}
}
@@ -1223,4 +1223,30 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
}
+
+ test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") {
+ store = makeBlockManager(12000)
+ val memoryStore = store.memoryStore
+ val blockId = BlockId("rdd_3_10")
+ val result = memoryStore.putBytes(blockId, 13000, () => {
+ fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
+ })
+ assert(result.size === 13000)
+ assert(result.data === null)
+ assert(result.droppedBlocks === Nil)
+ }
+
+ test("put a small ByteBuffer to MemoryStore") {
+ store = makeBlockManager(12000)
+ val memoryStore = store.memoryStore
+ val blockId = BlockId("rdd_3_10")
+ var bytes: ByteBuffer = null
+ val result = memoryStore.putBytes(blockId, 10000, () => {
+ bytes = ByteBuffer.allocate(10000)
+ bytes
+ })
+ assert(result.size === 10000)
+ assert(result.data === Right(bytes))
+ assert(result.droppedBlocks === Nil)
+ }
}