aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMichael Allman <michael@videoamp.com>2017-03-21 11:51:22 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-21 11:51:22 +0800
commit7fa116f8fc77906202217c0cd2f9718a4e62632b (patch)
tree1173f862ca72cbb6d2e4e0ce32306fea42ad9e24 /core
parent0ec1db5475f1a7839bdbf0d9cffe93ce6970a7fe (diff)
downloadspark-7fa116f8fc77906202217c0cd2f9718a4e62632b.tar.gz
spark-7fa116f8fc77906202217c0cd2f9718a4e62632b.tar.bz2
spark-7fa116f8fc77906202217c0cd2f9718a4e62632b.zip
[SPARK-17204][CORE] Fix replicated off heap storage
(Jira: https://issues.apache.org/jira/browse/SPARK-17204) ## What changes were proposed in this pull request? There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems. ## How was this patch tested? `BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist. Author: Michael Allman <michael@videoamp.com> Closes #16499 from mallman/spark-17204-replicated_off_heap_storage.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala27
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala20
5 files changed, 105 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 45b7338080..245d94ac4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -317,6 +317,9 @@ private[spark] class BlockManager(
/**
* Put the block locally, using the given storage level.
+ *
+ * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
*/
override def putBlockData(
blockId: BlockId,
@@ -755,6 +758,9 @@ private[spark] class BlockManager(
/**
* Put a new block of serialized bytes to the block manager.
*
+ * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
+ *
* @param encrypt If true, asks the block manager to encrypt the data block before storing,
* when I/O encryption is enabled. This is required for blocks that have been
* read from unencrypted sources, since all the BlockManager read APIs
@@ -773,7 +779,7 @@ private[spark] class BlockManager(
if (encrypt && securityManager.ioEncryptionKey.isDefined) {
try {
val data = bytes.toByteBuffer
- val in = new ByteBufferInputStream(data, true)
+ val in = new ByteBufferInputStream(data)
val byteBufOut = new ByteBufferOutputStream(data.remaining())
val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
securityManager.ioEncryptionKey.get)
@@ -800,6 +806,9 @@ private[spark] class BlockManager(
*
* If the block already exists, this method will not overwrite it.
*
+ * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
+ *
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
@@ -843,7 +852,15 @@ private[spark] class BlockManager(
false
}
} else {
- memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
+ val memoryMode = level.memoryMode
+ memoryStore.putBytes(blockId, size, memoryMode, () => {
+ if (memoryMode == MemoryMode.OFF_HEAP &&
+ bytes.chunks.exists(buffer => !buffer.isDirect)) {
+ bytes.copy(Platform.allocateDirectBuffer)
+ } else {
+ bytes
+ }
+ })
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
@@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
try {
replicate(blockId, bytesToReplicate, level, remoteClassTag)
} finally {
- bytesToReplicate.dispose()
+ bytesToReplicate.unmap()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index e12f2e6095..5efdd23f79 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -236,22 +236,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
/** Helper methods for storage-related objects. */
private[spark] object StorageUtils extends Logging {
+ // Ewwww... Reflection!!! See the unmap method for justification
+ private val memoryMappedBufferFileDescriptorField = {
+ val mappedBufferClass = classOf[java.nio.MappedByteBuffer]
+ val fdField = mappedBufferClass.getDeclaredField("fd")
+ fdField.setAccessible(true)
+ fdField
+ }
/**
- * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
- * might cause errors if one attempts to read from the unmapped buffer, but it's better than
- * waiting for the GC to find it because that could lead to huge numbers of open files. There's
- * unfortunately no standard API to do this.
+ * Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
+ * API that will cause errors if one attempts to read from the disposed buffer. However, neither
+ * the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put
+ * pressure on the garbage collector. Waiting for garbage collection may lead to the depletion of
+ * off-heap memory or huge numbers of open files. There's unfortunately no standard API to
+ * manually dispose of these kinds of buffers.
+ *
+ * See also [[unmap]]
*/
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
- logTrace(s"Unmapping $buffer")
- if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
- buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+ logTrace(s"Disposing of $buffer")
+ cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
+ }
+ }
+
+ /**
+ * Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that will
+ * cause errors if one attempts to read from the unmapped buffer. However, the file descriptors of
+ * memory-mapped buffers do not put pressure on the garbage collector. Waiting for garbage
+ * collection may lead to huge numbers of open files. There's unfortunately no standard API to
+ * manually unmap memory-mapped buffers.
+ *
+ * See also [[dispose]]
+ */
+ def unmap(buffer: ByteBuffer): Unit = {
+ if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
+ // Note that direct buffers are instances of MappedByteBuffer. As things stand in Java 8, the
+ // JDK does not provide a public API to distinguish between direct buffers and memory-mapped
+ // buffers. As an alternative, we peek beneath the curtains and look for a non-null file
+ // descriptor in mappedByteBuffer
+ if (memoryMappedBufferFileDescriptorField.get(buffer) != null) {
+ logTrace(s"Unmapping $buffer")
+ cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
}
}
}
+ private def cleanDirectBuffer(buffer: DirectBuffer) = {
+ val cleaner = buffer.cleaner()
+ if (cleaner != null) {
+ cleaner.clean()
+ }
+ }
+
/**
* Update the given list of RDDInfo with the given list of storage statuses.
* This method overwrites the old values stored in the RDDInfo's.
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
index dce2ac63a6..50dc948e6c 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
@@ -23,11 +23,10 @@ import java.nio.ByteBuffer
import org.apache.spark.storage.StorageUtils
/**
- * Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
- * at the end of the stream (e.g. to close a memory-mapped file).
+ * Reads data from a ByteBuffer.
*/
private[spark]
-class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
+class ByteBufferInputStream(private var buffer: ByteBuffer)
extends InputStream {
override def read(): Int = {
@@ -72,9 +71,6 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
*/
private def cleanUp() {
if (buffer != null) {
- if (dispose) {
- StorageUtils.dispose(buffer)
- }
buffer = null
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 7572cac393..1667516663 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -86,7 +86,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
}
/**
- * Copy this buffer into a new ByteBuffer.
+ * Convert this buffer to a ByteBuffer. If this buffer is backed by a single chunk, its underlying
+ * data will not be copied. Instead, it will be duplicated. If this buffer is backed by multiple
+ * chunks, the data underlying this buffer will be copied into a new byte buffer. As a result, it
+ * is suggested to use this method only if the caller does not need to manage the memory
+ * underlying this buffer.
*
* @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size.
*/
@@ -132,10 +136,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
}
/**
- * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
- * might cause errors if one attempts to read from the unmapped buffer, but it's better than
- * waiting for the GC to find it because that could lead to huge numbers of open files. There's
- * unfortunately no standard API to do this.
+ * Attempt to clean up any ByteBuffer in this ChunkedByteBuffer which is direct or memory-mapped.
+ * See [[StorageUtils.dispose]] for more information.
+ *
+ * See also [[unmap]]
*/
def dispose(): Unit = {
if (!disposed) {
@@ -143,6 +147,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
disposed = true
}
}
+
+ /**
+ * Attempt to unmap any ByteBuffer in this ChunkedByteBuffer if it is memory-mapped. See
+ * [[StorageUtils.unmap]] for more information.
+ *
+ * See also [[dispose]]
+ */
+ def unmap(): Unit = {
+ if (!disposed) {
+ chunks.foreach(StorageUtils.unmap)
+ disposed = true
+ }
+ }
}
/**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 75dc04038d..d907add920 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -374,7 +374,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// Put the block into one of the stores
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
- stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+ val testValue = Array.fill[Byte](blockSize)(1)
+ stores(0).putSingle(blockId, testValue, storageLevel)
// Assert that master know two locations for the block
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -386,12 +387,23 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
testStore => blockLocations.contains(testStore.blockManagerId.executorId)
}.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
- assert(
- testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
- testStore.releaseLock(blockId)
+ val blockResultOpt = testStore.getLocalValues(blockId)
+ assert(blockResultOpt.isDefined, s"$blockId was not found in $testStoreName")
+ val localValues = blockResultOpt.get.data.toSeq
+ assert(localValues.size == 1)
+ assert(localValues.head === testValue)
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")
+ val memoryStore = testStore.memoryStore
+ if (memoryStore.contains(blockId) && !storageLevel.deserialized) {
+ memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer =>
+ assert(storageLevel.useOffHeap == byteBuffer.isDirect,
+ s"memory mode ${storageLevel.memoryMode} is not compatible with " +
+ byteBuffer.getClass.getSimpleName)
+ }
+ }
+
val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
// Assert that block status in the master for this store has expected storage level