aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageUtils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala52
1 files changed, 45 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index e12f2e6095..5efdd23f79 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -236,22 +236,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
/** Helper methods for storage-related objects. */
private[spark] object StorageUtils extends Logging {
+ // Ewwww... Reflection!!! See the unmap method for justification
+ private val memoryMappedBufferFileDescriptorField = {
+ val mappedBufferClass = classOf[java.nio.MappedByteBuffer]
+ val fdField = mappedBufferClass.getDeclaredField("fd")
+ fdField.setAccessible(true)
+ fdField
+ }
/**
- * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
- * might cause errors if one attempts to read from the unmapped buffer, but it's better than
- * waiting for the GC to find it because that could lead to huge numbers of open files. There's
- * unfortunately no standard API to do this.
+ * Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
+ * API that will cause errors if one attempts to read from the disposed buffer. However, neither
+ * the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put
+ * pressure on the garbage collector. Waiting for garbage collection may lead to the depletion of
+ * off-heap memory or huge numbers of open files. There's unfortunately no standard API to
+ * manually dispose of these kinds of buffers.
+ *
+ * See also [[unmap]]
*/
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
- logTrace(s"Unmapping $buffer")
- if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
- buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+ logTrace(s"Disposing of $buffer")
+ cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
+ }
+ }
+
+ /**
+ * Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that will
+ * cause errors if one attempts to read from the unmapped buffer. However, the file descriptors of
+ * memory-mapped buffers do not put pressure on the garbage collector. Waiting for garbage
+ * collection may lead to huge numbers of open files. There's unfortunately no standard API to
+ * manually unmap memory-mapped buffers.
+ *
+ * See also [[dispose]]
+ */
+ def unmap(buffer: ByteBuffer): Unit = {
+ if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
+ // Note that direct buffers are instances of MappedByteBuffer. As things stand in Java 8, the
+ // JDK does not provide a public API to distinguish between direct buffers and memory-mapped
+ // buffers. As an alternative, we peek beneath the curtains and look for a non-null file
+ // descriptor in mappedByteBuffer
+ if (memoryMappedBufferFileDescriptorField.get(buffer) != null) {
+ logTrace(s"Unmapping $buffer")
+ cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
}
}
}
+ private def cleanDirectBuffer(buffer: DirectBuffer) = {
+ val cleaner = buffer.cleaner()
+ if (cleaner != null) {
+ cleaner.clean()
+ }
+ }
+
/**
* Update the given list of RDDInfo with the given list of storage statuses.
* This method overwrites the old values stored in the RDDInfo's.