aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index e5e6a9e4a8..632b0ae9c2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
/**
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
@@ -228,12 +228,12 @@ private object TorrentBroadcast extends Logging {
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
- val bos = new ByteArrayChunkOutputStream(blockSize)
- val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
+ val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate)
+ val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
- bos.toArrays.map(ByteBuffer.wrap)
+ cbbos.toChunkedByteBuffer.getChunks()
}
def unBlockifyObject[T: ClassTag](