diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index c643c4b63c..fb4706e78d 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -41,6 +41,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks.forall(_.limit() > 0), "chunks must be non-empty") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + private[this] var disposed: Boolean = false + /** * This size of this buffer, in bytes. */ @@ -117,11 +119,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { /** * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers. * The new buffer will share no resources with the original buffer. + * + * @param allocator a method for allocating byte buffers */ - def copy(): ChunkedByteBuffer = { + def copy(allocator: Int => ByteBuffer): ChunkedByteBuffer = { val copiedChunks = getChunks().map { chunk => - // TODO: accept an allocator in this copy method to integrate with mem. accounting systems - val newChunk = ByteBuffer.allocate(chunk.limit()) + val newChunk = allocator(chunk.limit()) newChunk.put(chunk) newChunk.flip() newChunk @@ -136,7 +139,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * unfortunately no standard API to do this. */ def dispose(): Unit = { - chunks.foreach(StorageUtils.dispose) + if (!disposed) { + chunks.foreach(StorageUtils.dispose) + disposed = true + } } } |