aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala14
1 files changed, 10 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index c643c4b63c..fb4706e78d 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -41,6 +41,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
+ private[this] var disposed: Boolean = false
+
/**
* This size of this buffer, in bytes.
*/
@@ -117,11 +119,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
/**
* Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers.
* The new buffer will share no resources with the original buffer.
+ *
+ * @param allocator a method for allocating byte buffers
*/
- def copy(): ChunkedByteBuffer = {
+ def copy(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
val copiedChunks = getChunks().map { chunk =>
- // TODO: accept an allocator in this copy method to integrate with mem. accounting systems
- val newChunk = ByteBuffer.allocate(chunk.limit())
+ val newChunk = allocator(chunk.limit())
newChunk.put(chunk)
newChunk.flip()
newChunk
@@ -136,7 +139,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
* unfortunately no standard API to do this.
*/
def dispose(): Unit = {
- chunks.foreach(StorageUtils.dispose)
+ if (!disposed) {
+ chunks.foreach(StorageUtils.dispose)
+ disposed = true
+ }
}
}