aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authortedyu <yuzhihong@gmail.com>2015-12-08 10:01:44 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-08 10:01:44 -0800
commit75c60bf4ba91e45e76a6e27f054a1c550eb6ff94 (patch)
tree082a180b36dbb8fc5b9aea9d9aff7d9b3a36abee /core
parent6cb06e8711fd6ac10c57faeb94bc323cae1cef27 (diff)
downloadspark-75c60bf4ba91e45e76a6e27f054a1c550eb6ff94.tar.gz
spark-75c60bf4ba91e45e76a6e27f054a1c550eb6ff94.tar.bz2
spark-75c60bf4ba91e45e76a6e27f054a1c550eb6ff94.zip
[SPARK-12074] Avoid memory copy involving ByteBuffer.wrap(ByteArrayOutputStream.toByteArray)
SPARK-12060 fixed JavaSerializerInstance.serialize This PR applies the same technique on two other classes. zsxwing Author: tedyu <yuzhihong@gmail.com> Closes #10177 from tedyu/master.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala4
3 files changed, 8 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5fe5ae8c45..d4bc3a5c90 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -27,8 +27,7 @@ import org.apache.spark.{Accumulator, SparkEnv, TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
/**
@@ -172,7 +171,7 @@ private[spark] object Task {
serializer: SerializerInstance)
: ByteBuffer = {
- val out = new ByteArrayOutputStream(4096)
+ val out = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(out)
// Write currentFiles
@@ -193,7 +192,7 @@ private[spark] object Task {
dataOut.flush()
val taskBytes = serializer.serialize(task)
Utils.writeByteBuffer(taskBytes, out)
- ByteBuffer.wrap(out.toByteArray)
+ out.toByteBuffer
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ab0007fb78..ed05143877 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1202,9 +1202,9 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
- val byteStream = new ByteArrayOutputStream(4096)
+ val byteStream = new ByteBufferOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer)
- ByteBuffer.wrap(byteStream.toByteArray)
+ byteStream.toByteBuffer
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
index 92e45224db..8527e3ae69 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -23,7 +23,9 @@ import java.nio.ByteBuffer
/**
* Provide a zero-copy way to convert data in ByteArrayOutputStream to ByteBuffer
*/
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+private[spark] class ByteBufferOutputStream(capacity: Int) extends ByteArrayOutputStream(capacity) {
+
+ def this() = this(32)
def toByteBuffer: ByteBuffer = {
return ByteBuffer.wrap(buf, 0, count)