diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageUtils.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/StorageUtils.scala | 52 |
1 files changed, 45 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index e12f2e6095..5efdd23f79 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -236,22 +236,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + // Ewwww... Reflection!!! See the unmap method for justification + private val memoryMappedBufferFileDescriptorField = { + val mappedBufferClass = classOf[java.nio.MappedByteBuffer] + val fdField = mappedBufferClass.getDeclaredField("fd") + fdField.setAccessible(true) + fdField + } /** - * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that - * might cause errors if one attempts to read from the unmapped buffer, but it's better than - * waiting for the GC to find it because that could lead to huge numbers of open files. There's - * unfortunately no standard API to do this. + * Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun + * API that will cause errors if one attempts to read from the disposed buffer. However, neither + * the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put + * pressure on the garbage collector. Waiting for garbage collection may lead to the depletion of + * off-heap memory or huge numbers of open files. There's unfortunately no standard API to + * manually dispose of these kinds of buffers. + * + * See also [[unmap]] */ def dispose(buffer: ByteBuffer): Unit = { if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { - logTrace(s"Unmapping $buffer") - if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { - buffer.asInstanceOf[DirectBuffer].cleaner().clean() + logTrace(s"Disposing of $buffer") + cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer]) + } + } + + /** + * Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that will + * cause errors if one attempts to read from the unmapped buffer. However, the file descriptors of + * memory-mapped buffers do not put pressure on the garbage collector. Waiting for garbage + * collection may lead to huge numbers of open files. There's unfortunately no standard API to + * manually unmap memory-mapped buffers. + * + * See also [[dispose]] + */ + def unmap(buffer: ByteBuffer): Unit = { + if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { + // Note that direct buffers are instances of MappedByteBuffer. As things stand in Java 8, the + // JDK does not provide a public API to distinguish between direct buffers and memory-mapped + // buffers. As an alternative, we peek beneath the curtains and look for a non-null file + // descriptor in mappedByteBuffer + if (memoryMappedBufferFileDescriptorField.get(buffer) != null) { + logTrace(s"Unmapping $buffer") + cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer]) } } } + private def cleanDirectBuffer(buffer: DirectBuffer) = { + val cleaner = buffer.cleaner() + if (cleaner != null) { + cleaner.clean() + } + } + /** * Update the given list of RDDInfo with the given list of storage statuses. * This method overwrites the old values stored in the RDDInfo's. |