From 75c60bf4ba91e45e76a6e27f054a1c550eb6ff94 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 8 Dec 2015 10:01:44 -0800 Subject: [SPARK-12074] Avoid memory copy involving ByteBuffer.wrap(ByteArrayOutputStream.toByteArray) SPARK-12060 fixed JavaSerializerInstance.serialize This PR applies the same technique on two other classes. zsxwing Author: tedyu Closes #10177 from tedyu/master. --- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 7 +++---- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- .../main/scala/org/apache/spark/util/ByteBufferOutputStream.scala | 4 +++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 5fe5ae8c45..d4bc3a5c90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -27,8 +27,7 @@ import org.apache.spark.{Accumulator, SparkEnv, TaskContextImpl, TaskContext} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.ByteBufferInputStream -import org.apache.spark.util.Utils +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} /** @@ -172,7 +171,7 @@ private[spark] object Task { serializer: SerializerInstance) : ByteBuffer = { - val out = new ByteArrayOutputStream(4096) + val out = new ByteBufferOutputStream(4096) val dataOut = new DataOutputStream(out) // Write currentFiles @@ -193,7 +192,7 @@ private[spark] object Task { dataOut.flush() val taskBytes = serializer.serialize(task) Utils.writeByteBuffer(taskBytes, out) - ByteBuffer.wrap(out.toByteArray) + out.toByteBuffer } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ab0007fb78..ed05143877 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1202,9 +1202,9 @@ private[spark] class BlockManager( blockId: BlockId, values: Iterator[Any], serializer: Serializer = defaultSerializer): ByteBuffer = { - val byteStream = new ByteArrayOutputStream(4096) + val byteStream = new ByteBufferOutputStream(4096) dataSerializeStream(blockId, byteStream, values, serializer) - ByteBuffer.wrap(byteStream.toByteArray) + byteStream.toByteBuffer } /** diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala index 92e45224db..8527e3ae69 100644 --- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala @@ -23,7 +23,9 @@ import java.nio.ByteBuffer /** * Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer */ -private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream { +private[spark] class ByteBufferOutputStream(capacity: Int) extends ByteArrayOutputStream(capacity) { + + def this() = this(32) def toByteBuffer: ByteBuffer = { return ByteBuffer.wrap(buf, 0, count) -- cgit v1.2.3