aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala23
1 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 45b7338080..245d94ac4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -317,6 +317,9 @@ private[spark] class BlockManager(
/**
* Put the block locally, using the given storage level.
+ *
+ * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
*/
override def putBlockData(
blockId: BlockId,
@@ -755,6 +758,9 @@ private[spark] class BlockManager(
/**
* Put a new block of serialized bytes to the block manager.
*
+ * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
+ *
* @param encrypt If true, asks the block manager to encrypt the data block before storing,
* when I/O encryption is enabled. This is required for blocks that have been
* read from unencrypted sources, since all the BlockManager read APIs
@@ -773,7 +779,7 @@ private[spark] class BlockManager(
if (encrypt && securityManager.ioEncryptionKey.isDefined) {
try {
val data = bytes.toByteBuffer
- val in = new ByteBufferInputStream(data, true)
+ val in = new ByteBufferInputStream(data)
val byteBufOut = new ByteBufferOutputStream(data.remaining())
val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
securityManager.ioEncryptionKey.get)
@@ -800,6 +806,9 @@ private[spark] class BlockManager(
*
* If the block already exists, this method will not overwrite it.
*
+ * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
+ *
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
@@ -843,7 +852,15 @@ private[spark] class BlockManager(
false
}
} else {
- memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
+ val memoryMode = level.memoryMode
+ memoryStore.putBytes(blockId, size, memoryMode, () => {
+ if (memoryMode == MemoryMode.OFF_HEAP &&
+ bytes.chunks.exists(buffer => !buffer.isDirect)) {
+ bytes.copy(Platform.allocateDirectBuffer)
+ } else {
+ bytes
+ }
+ })
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
@@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
try {
replicate(blockId, bytesToReplicate, level, remoteClassTag)
} finally {
- bytesToReplicate.dispose()
+ bytesToReplicate.unmap()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))