diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 23 |
1 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 45b7338080..245d94ac4f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -317,6 +317,9 @@ private[spark] class BlockManager( /** * Put the block locally, using the given storage level. + * + * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing + * so may corrupt or change the data stored by the `BlockManager`. */ override def putBlockData( blockId: BlockId, @@ -755,6 +758,9 @@ private[spark] class BlockManager( /** * Put a new block of serialized bytes to the block manager. * + * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing + * so may corrupt or change the data stored by the `BlockManager`. + * * @param encrypt If true, asks the block manager to encrypt the data block before storing, * when I/O encryption is enabled. This is required for blocks that have been * read from unencrypted sources, since all the BlockManager read APIs @@ -773,7 +779,7 @@ private[spark] class BlockManager( if (encrypt && securityManager.ioEncryptionKey.isDefined) { try { val data = bytes.toByteBuffer - val in = new ByteBufferInputStream(data, true) + val in = new ByteBufferInputStream(data) val byteBufOut = new ByteBufferOutputStream(data.remaining()) val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf, securityManager.ioEncryptionKey.get) @@ -800,6 +806,9 @@ private[spark] class BlockManager( * * If the block already exists, this method will not overwrite it. * + * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing + * so may corrupt or change the data stored by the `BlockManager`. + * * @param keepReadLock if true, this method will hold the read lock when it returns (even if the * block already exists). If false, this method will hold no locks when it * returns. @@ -843,7 +852,15 @@ private[spark] class BlockManager( false } } else { - memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes) + val memoryMode = level.memoryMode + memoryStore.putBytes(blockId, size, memoryMode, () => { + if (memoryMode == MemoryMode.OFF_HEAP && + bytes.chunks.exists(buffer => !buffer.isDirect)) { + bytes.copy(Platform.allocateDirectBuffer) + } else { + bytes + } + }) } if (!putSucceeded && level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") @@ -1048,7 +1065,7 @@ private[spark] class BlockManager( try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { - bytesToReplicate.dispose() + bytesToReplicate.unmap() } logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) |